diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-08-07 19:25:12 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-08-07 19:25:12 +0000 |
commit | f4dc59ea3028b87c1f8640df02c2a73b5cafcf1a (patch) | |
tree | 3e903ac9fb763cf6d1e055dad8d7cb403ef675ae | |
parent | 06e4488afee377347660ea481959dfbfb720ab47 (diff) | |
download | qpid-python-f4dc59ea3028b87c1f8640df02c2a73b5cafcf1a.tar.gz |
QPID-1213: Patch from rgodfrey to refactor AbstractJMSMessage and descendants to move AMQP version specific code into delegates and remove unnecessary conversion between 0-8 and 0-10 objects
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683683 13f79535-47bb-0310-9956-ffa450edef68
53 files changed, 2384 insertions, 1116 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index c6cd5da01d..67eb180dbf 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -24,16 +24,15 @@ import junit.framework.TestCase; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 471912c85a..15449dc613 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -20,20 +20,15 @@ */ package org.apache.qpid.server.util; -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.management.NoopManagedObjectRegistry; -import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.plugins.AllowAll; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 52080112c9..6c78b754bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -65,9 +65,9 @@ public abstract class AMQDestination implements Destination, Referenceable private static final int IS_EXCLUSIVE_MASK = 0x2; private static final int IS_AUTODELETE_MASK = 0x4; - public static final Integer QUEUE_TYPE = Integer.valueOf(1); - public static final Integer TOPIC_TYPE = Integer.valueOf(2); - public static final Integer UNKNOWN_TYPE = Integer.valueOf(3); + public static final int QUEUE_TYPE = 1; + public static final int TOPIC_TYPE = 2; + public static final int UNKNOWN_TYPE = 3; protected AMQDestination(String url) throws URISyntaxException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d2ef457c94..723e502ff0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -68,15 +68,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSBytesMessage; -import org.apache.qpid.client.message.JMSMapMessage; -import org.apache.qpid.client.message.JMSObjectMessage; -import org.apache.qpid.client.message.JMSStreamMessage; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.ReturnMessage; -import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.client.state.AMQStateManager; @@ -109,6 +101,8 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + + public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -401,7 +395,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { @@ -515,7 +509,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public BytesMessage createBytesMessage() throws JMSException { checkNotClosed(); - return new JMSBytesMessage(); + return new JMSBytesMessage(getMessageDelegateFactory()); } /** @@ -629,7 +623,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session. @@ -691,7 +685,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (!_closed.getAndSet(true)) { - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { synchronized (_messageDeliveryLock) { @@ -944,7 +938,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public MapMessage createMapMessage() throws JMSException { checkNotClosed(); - return new JMSMapMessage(); + return new JMSMapMessage(getMessageDelegateFactory()); } public javax.jms.Message createMessage() throws JMSException @@ -955,7 +949,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public ObjectMessage createObjectMessage() throws JMSException { checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(); + return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory()); } public ObjectMessage createObjectMessage(Serializable object) throws JMSException @@ -1158,11 +1152,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. // We need to determin here if the connection should be - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { checkNotClosed(); - return new JMSStreamMessage(); + return new JMSStreamMessage(getMessageDelegateFactory()); } } @@ -1215,14 +1209,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public TextMessage createTextMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { checkNotClosed(); - return new JMSTextMessage(); + return new JMSTextMessage(getMessageDelegateFactory()); } } + protected Object getFailoverMutex() + { + return _connection.getFailoverMutex(); + } + public TextMessage createTextMessage(String text) throws JMSException { @@ -1459,7 +1458,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - abstract void sendRecover() throws AMQException, FailoverException; + protected abstract void sendRecover() throws AMQException, FailoverException; public void rejectMessage(UnprocessedMessage message, boolean requeue) { @@ -2181,7 +2180,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. @@ -2887,9 +2886,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - abstract boolean tagLE(long tag1, long tag2); + protected abstract boolean tagLE(long tag1, long tag2); + + protected abstract boolean updateRollbackMark(long current, long deliveryTag); - abstract boolean updateRollbackMark(long current, long deliveryTag); + public abstract AMQMessageDelegateFactory getMessageDelegateFactory(); /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index cf73bc5089..32a0fdd5f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.util.Serial; import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; @@ -773,7 +774,7 @@ public class AMQSession_0_10 extends AMQSession return subscriber; } - Long requestQueueDepth(AMQDestination amqd) + protected Long requestQueueDepth(AMQDestination amqd) { return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } @@ -821,14 +822,19 @@ public class AMQSession_0_10 extends AMQSession } } - final boolean tagLE(long tag1, long tag2) + protected final boolean tagLE(long tag1, long tag2) { return Serial.le((int) tag1, (int) tag2); } - final boolean updateRollbackMark(long currentMark, long deliveryTag) + protected final boolean updateRollbackMark(long currentMark, long deliveryTag) { return Serial.lt((int) currentMark, (int) deliveryTag); } + public AMQMessageDelegateFactory getMessageDelegateFactory() + { + return AMQMessageDelegateFactory.FACTORY_0_10; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3ccff410eb..2c88d6f557 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; @@ -470,7 +471,7 @@ public final class AMQSession_0_8 extends AMQSession } - Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException + protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException { AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(), @@ -486,14 +487,19 @@ public final class AMQSession_0_8 extends AMQSession return okHandler._messageCount; } - final boolean tagLE(long tag1, long tag2) + protected final boolean tagLE(long tag1, long tag2) { return tag1 <= tag2; } - final boolean updateRollbackMark(long currentMark, long deliveryTag) + protected final boolean updateRollbackMark(long currentMark, long deliveryTag) { return false; } + public AMQMessageDelegateFactory getMessageDelegateFactory() + { + return AMQMessageDelegateFactory.FACTORY_0_8; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index aaf56e8493..0176f66552 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -25,6 +25,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; @@ -683,7 +684,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me try { - AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame); + AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame); if (debug) { @@ -720,7 +721,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame) + public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame) throws Exception; /** @param jmsMessage this message has already been processed so can't redo preDeliver */ diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ab9e1d285f..388adfb434 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -24,12 +24,10 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; -import org.apache.qpid.jms.*; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.api.Message; import org.apache.qpid.transport.*; -import org.apache.qpid.transport.Session; import org.apache.qpid.QpidException; import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.filter.JMSSelectorFilter; @@ -269,12 +267,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( - UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception + AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception { return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(), - messageFrame.getExchange(), messageFrame.getRoutingKey(), - messageFrame.getContentHeader(), messageFrame.getBodies(), - messageFrame.getReplyToURL()); + messageFrame.getContentHeader(), messageFrame.getBodies() + ); } // private methods diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 816bc430b2..e601f454a4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client; -import java.util.concurrent.TimeUnit; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -31,6 +29,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQFrame; @@ -82,7 +81,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader } } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception { return _messageFactory.createMessage(messageFrame.getDeliveryTag(), diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 514b0f1213..b32cc86edb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -31,23 +31,14 @@ import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.ObjectMessage; -import javax.jms.Queue; import javax.jms.StreamMessage; import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; @@ -369,27 +360,27 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac if (message instanceof BytesMessage) { - newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage(); + newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage(); } else if (message instanceof MapMessage) { - newMessage = new MessageConverter((MapMessage) message).getConvertedMessage(); + newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage(); } else if (message instanceof ObjectMessage) { - newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage(); + newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage(); } else if (message instanceof TextMessage) { - newMessage = new MessageConverter((TextMessage) message).getConvertedMessage(); + newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage(); } else if (message instanceof StreamMessage) { - newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage(); + newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage(); } else { - newMessage = new MessageConverter(message).getConvertedMessage(); + newMessage = new MessageConverter(_session, message).getConvertedMessage(); } if (newMessage != null) @@ -460,7 +451,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac UUID messageId = null; if (_disableMessageId) { - message.setJMSMessageID(null); + message.setJMSMessageID((UUID)null); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 4e906b387d..2f06dc9d39 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -29,6 +29,7 @@ import javax.jms.DeliveryMode; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -72,9 +73,15 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer boolean immediate, boolean wait) throws JMSException { message.prepareForSending(); - if (message.get010Message() == null) + + AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate(); + + org.apache.qpid.api.Message underlyingMessage = message.get010Message(); + if (underlyingMessage == null) { - message.set010Message(new ByteBufferMessage()); + underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties()); + message.set010Message(underlyingMessage); + } // force a rebuild of the 0-10 message if data has changed if (message.getData() == null) @@ -82,8 +89,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.dataChanged(); } - DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties(); - MessageProperties messageProps = message.get010Message().getMessageProperties(); + DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties(); + MessageProperties messageProps = underlyingMessage.getMessageProperties(); if (messageId != null) { @@ -144,20 +151,10 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryProp.setRoutingKey(routingKey); } - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); - if (contentHeaderProperties.reset()) - { - // set the application properties - messageProps.setContentType(contentHeaderProperties.getContentType().toString()); - messageProps.setContentLength(message.getContentLength()); + messageProps.setContentLength(message.getContentLength()); - AMQShortString correlationID = contentHeaderProperties.getCorrelationId(); - if (correlationID != null) - { - messageProps.setCorrelationId(correlationID.getBytes()); - } - String replyToURL = contentHeaderProperties.getReplyToAsString(); + /* String replyToURL = contentHeaderProperties.getReplyToAsString(); if (replyToURL != null) { if(_logger.isDebugEnabled()) @@ -179,31 +176,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString())); } +*/ - Map<String,Object> map = null; - - if (contentHeaderProperties.getHeaders() != null) - { - //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix - contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); - map = FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders()); - } - - AMQShortString type = contentHeaderProperties.getType(); - if (type != null) - { - if (map == null) - { - map = new HashMap<String,Object>(); - } - map.put(AbstractJMSMessage.JMS_TYPE, type.toString()); - } - - if (map != null) - { - messageProps.setApplicationHeaders(map); - } - } // send the message try @@ -221,7 +195,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { ssn.messageTransfer(destination.getExchangeName().toString(), - message.get010Message(), + underlyingMessage, ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 8ca68850eb..c547fcb488 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -27,6 +27,8 @@ import javax.jms.Message; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.AMQMessageDelegate; +import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicConsumeBody; @@ -81,7 +83,8 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer message.prepareForSending(); ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); + AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); + BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); if (!_disableTimestamps) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java index 8741a1cbb6..e69de29bb2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java @@ -1,135 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import javax.jms.JMSException; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.ContentHeaderProperties; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -import java.math.BigDecimal; - -public class AMQMessage -{ - protected ContentHeaderProperties _contentHeaderProperties; - - /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - protected AMQSession _session; - - protected final long _deliveryTag; - - public AMQMessage(ContentHeaderProperties properties, long deliveryTag) - { - _contentHeaderProperties = properties; - _deliveryTag = deliveryTag; - } - - public AMQMessage(ContentHeaderProperties properties) - { - this(properties, -1); - } - - /** - * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls - * acknowledge() - * - * @param s the AMQ session that delivered this message - */ - public void setAMQSession(AMQSession s) - { - _session = s; - } - - public AMQSession getAMQSession() - { - return _session; - } - - /** - * Get the AMQ message number assigned to this message - * - * @return the message number - */ - public long getDeliveryTag() - { - return _deliveryTag; - } - - /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */ - public void prepareForSending() throws JMSException - { - } - - public FieldTable getPropertyHeaders() - { - return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders(); - } - - public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException - { - getPropertyHeaders().setDecimal(propertyName, bd); - } - - public void setIntProperty(AMQShortString propertyName, int i) throws JMSException - { - getPropertyHeaders().setInteger(propertyName, new Integer(i)); - } - - public void setLongStringProperty(AMQShortString propertyName, String value) - { - getPropertyHeaders().setString(propertyName, value); - } - - public void setTimestampProperty(AMQShortString propertyName, long value) - { - getPropertyHeaders().setTimestamp(propertyName, value); - } - - public void setVoidProperty(AMQShortString propertyName) - { - getPropertyHeaders().setVoid(propertyName); - } - - //** Getters - - public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException - { - return getPropertyHeaders().getDecimal(propertyName); - } - - public int getIntegerProperty(AMQShortString propertyName) throws JMSException - { - return getPropertyHeaders().getInteger(propertyName); - } - - public String getLongStringProperty(AMQShortString propertyName) - { - return getPropertyHeaders().getString(propertyName); - } - - public Long getTimestampProperty(AMQShortString propertyName) - { - return getPropertyHeaders().getTimestamp(propertyName); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java new file mode 100644 index 0000000000..7f43e4b4b2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.message; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.BasicContentHeaderProperties; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.util.Enumeration; +import java.util.UUID; + +public interface AMQMessageDelegate +{ + void acknowledgeThis() throws JMSException; + + String getJMSMessageID() throws JMSException; + + void setJMSMessageID(String string) throws JMSException; + + long getJMSTimestamp() throws JMSException; + + void setJMSTimestamp(long l) throws JMSException; + + byte[] getJMSCorrelationIDAsBytes() throws JMSException; + + void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException; + + void setJMSCorrelationID(String string) throws JMSException; + + String getJMSCorrelationID() throws JMSException; + + Destination getJMSReplyTo() throws JMSException; + + void setJMSReplyTo(Destination destination) throws JMSException; + + Destination getJMSDestination() throws JMSException; + + int getJMSDeliveryMode() throws JMSException; + + void setJMSDeliveryMode(int i) throws JMSException; + + String getJMSType() throws JMSException; + + void setJMSType(String string) throws JMSException; + + long getJMSExpiration() throws JMSException; + + void setJMSExpiration(long l) throws JMSException; + + int getJMSPriority() throws JMSException; + + void setJMSPriority(int i) throws JMSException; + + void clearProperties() throws JMSException; + + boolean propertyExists(String string) throws JMSException; + + boolean getBooleanProperty(String string) throws JMSException; + + byte getByteProperty(String string) throws JMSException; + + short getShortProperty(String string) throws JMSException; + + int getIntProperty(String string) throws JMSException; + + long getLongProperty(String string) throws JMSException; + + float getFloatProperty(String string) throws JMSException; + + double getDoubleProperty(String string) throws JMSException; + + String getStringProperty(String string) throws JMSException; + + Object getObjectProperty(String string) throws JMSException; + + Enumeration getPropertyNames() throws JMSException; + + void setBooleanProperty(String string, boolean b) throws JMSException; + + void setByteProperty(String string, byte b) throws JMSException; + + void setShortProperty(String string, short i) throws JMSException; + + void setIntProperty(String string, int i) throws JMSException; + + void setLongProperty(String string, long l) throws JMSException; + + void setFloatProperty(String string, float v) throws JMSException; + + void setDoubleProperty(String string, double v) throws JMSException; + + void setStringProperty(String string, String string1) throws JMSException; + + void setObjectProperty(String string, Object object) throws JMSException; + + void acknowledge() throws JMSException; + + public void setJMSDestination(Destination destination); + + public void setContentType(String contentType); + public String getContentType(); + + public void setEncoding(String encoding); + public String getEncoding(); + + + String getReplyToString(); + + void removeProperty(final String propertyName) throws JMSException; + + void setAMQSession(final AMQSession s); + + AMQSession getAMQSession(); + + long getDeliveryTag(); + + void setJMSMessageID(final UUID messageId) throws JMSException; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java new file mode 100644 index 0000000000..8c3f2fd08f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.AMQException; + +public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate> +{ + public static AMQMessageDelegateFactory DEFAULT_FACTORY = null; + + public static AMQMessageDelegateFactory<AMQMessageDelegate_0_8> FACTORY_0_8 = + new AMQMessageDelegateFactory<AMQMessageDelegate_0_8>() + { + public AMQMessageDelegate_0_8 createDelegate() + { + return new AMQMessageDelegate_0_8(); + } + }; + + public static AMQMessageDelegateFactory<AMQMessageDelegate_0_10> FACTORY_0_10 = + new AMQMessageDelegateFactory<AMQMessageDelegate_0_10>() + { + public AMQMessageDelegate_0_10 createDelegate() + { + return new AMQMessageDelegate_0_10(); + } + }; + + + public D createDelegate(); + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java new file mode 100644 index 0000000000..97cb2baee4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -0,0 +1,900 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.message; + +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.jms.Message; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.transport.*; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import javax.jms.MessageFormatException; +import javax.jms.DeliveryMode; +import java.util.*; +import java.net.URISyntaxException; +import java.nio.charset.Charset; + +public class AMQMessageDelegate_0_10 implements AMQMessageDelegate +{ + private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + + public static final String JMS_TYPE = "x-jms-type"; + + + private boolean _readableProperties = false; + + private Destination _destination; + + + private MessageProperties _messageProps; + private DeliveryProperties _deliveryProps; + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ + private AMQSession _session; + private final long _deliveryTag; + + private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>(); + + static + { + // TODO - XXX - Need to add to this map when we find an exchange we don't know about + + _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE); + _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE); + + + } + + protected AMQMessageDelegate_0_10() + { + this(new MessageProperties(), new DeliveryProperties(), -1); + _readableProperties = false; + + + } + + protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange, + AMQShortString routingKey) throws AMQException + { + this(messageProps, deliveryProps, deliveryTag); + + + AMQDestination dest; + + dest = new AMQUndefinedDestination(exchange, routingKey, null); + + // Destination dest = AMQDestination.createDestination(url); + setJMSDestination(dest); + + + + } + + protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) + { + _messageProps = messageProps; + _deliveryProps = deliveryProps; + _deliveryTag = deliveryTag; + _readableProperties = (_messageProps != null); + + } + + + public String getJMSMessageID() throws JMSException + { + UUID id = _messageProps.getMessageId(); + return id == null ? null : "ID:" + id; + } + + public void setJMSMessageID(String messageId) throws JMSException + { + if(messageId == null) + { + _messageProps.clearMessageId(); + } + else + { + if(messageId.startsWith("ID:")) + { + try + { + _messageProps.setMessageId(UUID.fromString(messageId.substring(3))); + } + catch(IllegalArgumentException ex) + { + throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID"); + } + } + else + { + throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID"); + } + } + } + + public void setJMSMessageID(UUID messageId) throws JMSException + { + if(messageId == null) + { + _messageProps.clearMessageId(); + } + else + { + _messageProps.setMessageId(messageId); + } + } + + + public long getJMSTimestamp() throws JMSException + { + return _deliveryProps.getTimestamp(); + } + + public void setJMSTimestamp(long timestamp) throws JMSException + { + _deliveryProps.setTimestamp(timestamp); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return _messageProps.getCorrelationId(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + _messageProps.setCorrelationId(bytes); + } + + public void setJMSCorrelationID(String correlationId) throws JMSException + { + + setJMSCorrelationIDAsBytes(correlationId == null ? null : correlationId.getBytes()); + } + + public String getJMSCorrelationID() throws JMSException + { + + byte[] correlationIDAsBytes = getJMSCorrelationIDAsBytes(); + return correlationIDAsBytes == null ? null : new String(correlationIDAsBytes); + } + + public Destination getJMSReplyTo() + { + ReplyTo replyTo = _messageProps.getReplyTo(); + + if (replyTo == null) + { + return null; + } + else + { + Destination dest = _destinationCache.get(replyTo); + if (dest == null) + { + String exchange = replyTo.getExchange(); + String routingKey = replyTo.getRoutingKey(); + + int type = _exchangeTypeMap.get(exchange); + + switch(type) + { + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey)); + break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null); + break; + default: + dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null); + } + + + + _destinationCache.put(replyTo, dest); + } + + return dest; + } + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + if (destination == null) + { + throw new IllegalArgumentException("Null destination not allowed"); + } + + if (!(destination instanceof AMQDestination)) + { + throw new IllegalArgumentException( + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + } + + final AMQDestination amqd = (AMQDestination) destination; + + final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); + _destinationCache.put(replyTo, destination); + _messageProps.setReplyTo(replyTo); + + } + + public Destination getJMSDestination() throws JMSException + { + return _destination; + } + + public void setJMSDestination(Destination destination) + { + _destination = destination; + } + + public void setContentType(String contentType) + { + _messageProps.setContentType(contentType); + } + + public String getContentType() + { + return _messageProps.getContentType(); + } + + public void setEncoding(String encoding) + { + if(encoding == null || encoding.length() == 0) + { + _messageProps.clearContentEncoding(); + } + else + { + _messageProps.setContentEncoding(encoding); + } + } + + public String getEncoding() + { + return _messageProps.getContentEncoding(); + } + + public String getReplyToString() + { + Destination replyTo = getJMSReplyTo(); + if(replyTo != null) + { + return ((AMQDestination)replyTo).toURL(); + } + else + { + return null; + } + + } + + public int getJMSDeliveryMode() throws JMSException + { + + MessageDeliveryMode deliveryMode = _deliveryProps.getDeliveryMode(); + if(deliveryMode != null) + { + switch(deliveryMode) + { + case PERSISTENT : + return DeliveryMode.PERSISTENT; + case NON_PERSISTENT: + return DeliveryMode.NON_PERSISTENT; + default: + throw new JMSException("Unknown Message Delivery Mode: " + _deliveryProps.getDeliveryMode()); + } + } + else + { + return Message.DEFAULT_DELIVERY_MODE; + } + + } + + public void setJMSDeliveryMode(int deliveryMode) throws JMSException + { + switch(deliveryMode) + { + case DeliveryMode.PERSISTENT: + _deliveryProps.setDeliveryMode(MessageDeliveryMode.PERSISTENT); + break; + case DeliveryMode.NON_PERSISTENT: + _deliveryProps.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); + break; + default: + throw new JMSException("Unknown JMS Delivery Mode: " + deliveryMode); + } + + } + + + public String getJMSType() throws JMSException + { + if(getApplicationHeaders().containsKey(JMS_TYPE)) + { + return getStringProperty(JMS_TYPE); + } + else + { + return null; + } + } + + private Map<String, Object> getApplicationHeaders() + { + Map<String, Object> map = _messageProps.getApplicationHeaders(); + return map == null ? Collections.EMPTY_MAP : map; + } + + public void setJMSType(String type) throws JMSException + { + Map<String, Object> headers = _messageProps.getApplicationHeaders(); + if(type == null) + { + if(headers != null) + { + headers.remove(JMS_TYPE); + } + } + else + { + if(headers == null) + { + headers = new HashMap<String,Object>(); + _messageProps.setApplicationHeaders(headers); + + } + headers.put(JMS_TYPE, type); + } + } + + public long getJMSExpiration() throws JMSException + { + return _deliveryProps.getExpiration(); + } + + public void setJMSExpiration(long l) throws JMSException + { + _deliveryProps.setExpiration(l); + } + + + + public boolean propertyExists(String propertyName) throws JMSException + { + return getApplicationHeaders().containsKey(propertyName); + } + + public boolean getBooleanProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + + Object o = getApplicationHeaders().get(propertyName); + + if(o instanceof Boolean) + { + return ((Boolean)o).booleanValue(); + } + else if(o instanceof String) + { + return Boolean.valueOf((String) o).booleanValue(); + } + else if(getApplicationHeaders().containsKey(propertyName)) + { + throw new MessageFormatException("getBooleanProperty(\""+propertyName+"\") failed as value is not boolean: " + o); + } + else + { + return Boolean.valueOf(null); + } + } + + public byte getByteProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof Byte) + { + return ((Byte)o).byteValue(); + } + else if(o instanceof String) + { + return Byte.valueOf((String) o).byteValue(); + } + else if(getApplicationHeaders().containsKey(propertyName)) + { + throw new MessageFormatException("getByteProperty(\""+propertyName+"\") failed as value is not a byte: " + o); + } + else + { + return Byte.valueOf(null); + } + } + + public short getShortProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof Short) + { + return ((Short)o).shortValue(); + } + else + { + try + { + return Short.valueOf(getByteProperty(propertyName)); + } + catch(MessageFormatException e) + { + throw new MessageFormatException("getShortProperty(\""+propertyName+"\") failed as value is not a short: " + o); + } + } + + + } + + public int getIntProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof Integer) + { + return ((Integer)o).intValue(); + } + else + { + try + { + return Integer.valueOf(getShortProperty(propertyName)); + } + catch(MessageFormatException e) + { + throw new MessageFormatException("getIntProperty(\""+propertyName+"\") failed as value is not an int: " + o); + } + + } + } + + public long getLongProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof Long) + { + return ((Long)o).longValue(); + } + else + { + try + { + return Long.valueOf(getIntProperty(propertyName)); + } + catch(MessageFormatException e) + { + throw new MessageFormatException("getLongProperty(\""+propertyName+"\") failed as value is not a long: " + o); + } + + } + } + + public float getFloatProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof Float) + { + return ((Float)o).floatValue(); + } + else if(o instanceof String) + { + return Float.valueOf((String) o).floatValue(); + } + else if(getApplicationHeaders().containsKey(propertyName)) + { + throw new MessageFormatException("getFloatProperty(\""+propertyName+"\") failed as value is not a float: " + o); + } + else + { + return Float.valueOf(null); + } + + } + + public double getDoubleProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof Double) + { + return ((Double)o).doubleValue(); + } + else + { + try + { + return Double.valueOf(getFloatProperty(propertyName)); + } + catch(MessageFormatException e) + { + throw new MessageFormatException("getDoubleProperty(\""+propertyName+"\") failed as value is not a double: " + o); + } + + } + } + + public String getStringProperty(String propertyName) throws JMSException + { + if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) + { + return new String(_messageProps.getUserId()); + } + else + { + checkPropertyName(propertyName); + Map<String, Object> propertyMap = getApplicationHeaders(); + + Object o = propertyMap.get(propertyName); + + if(o instanceof String) + { + return (String) o; + } + else if(o == null) + { + if(propertyMap.containsKey(propertyName)) + { + return null; + } + else + { + return String.valueOf(null); + } + } + else if(o.getClass().isArray()) + { + throw new MessageFormatException("getString(\""+propertyName+"\") failed as value of type " + o.getClass()+ " is an array."); + } + else + { + return String.valueOf(o); + } + + } + } + + public Object getObjectProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + Map<String, Object> propertyMap = getApplicationHeaders(); + + return propertyMap.get(propertyName); + + } + + public Enumeration getPropertyNames() throws JMSException + { + return java.util.Collections.enumeration(getApplicationHeaders().keySet()); + } + + public void setBooleanProperty(String propertyName, boolean b) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, b); + } + + public void setByteProperty(String propertyName, byte b) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, b); + } + + public void setShortProperty(String propertyName, short i) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, i); + } + + public void setIntProperty(String propertyName, int i) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, i); + } + + public void setLongProperty(String propertyName, long l) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, l); + } + + public void setFloatProperty(String propertyName, float f) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, f); + } + + public void setDoubleProperty(String propertyName, double v) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, v); + } + + public void setStringProperty(String propertyName, String value) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, value); + } + + public void setObjectProperty(String propertyName, Object object) throws JMSException + { + checkWritableProperties(); + setApplicationHeader(propertyName, object); + } + + private void setApplicationHeader(String propertyName, Object object) + { + Map<String, Object> headers = _messageProps.getApplicationHeaders(); + if(headers == null) + { + headers = new HashMap<String,Object>(); + _messageProps.setApplicationHeaders(headers); + } + headers.put(propertyName, object); + } + + public void removeProperty(String propertyName) throws JMSException + { + Map<String, Object> headers = _messageProps.getApplicationHeaders(); + if(headers != null) + { + headers.remove(propertyName); + } + } + + + + protected void checkWritableProperties() throws MessageNotWriteableException + { + if (_readableProperties) + { + throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); + } + } + + + public int getJMSPriority() throws JMSException + { + MessageDeliveryPriority messageDeliveryPriority = _deliveryProps.getPriority(); + return messageDeliveryPriority == null ? Message.DEFAULT_PRIORITY : messageDeliveryPriority.getValue(); + } + + public void setJMSPriority(int i) throws JMSException + { + _deliveryProps.setPriority(MessageDeliveryPriority.get((short)i)); + } + + public void clearProperties() throws JMSException + { + if(!getApplicationHeaders().isEmpty()) + { + getApplicationHeaders().clear(); + } + + _readableProperties = false; + } + + + public void acknowledgeThis() throws JMSException + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_session != null) + { + if (_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // received on the session + _session.acknowledgeMessage(_deliveryTag, true); + } + } + + public void acknowledge() throws JMSException + { + if (_session != null) + { + _session.acknowledge(); + } + } + + + /** + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls + * acknowledge() + * + * @param s the AMQ session that delivered this message + */ + public void setAMQSession(AMQSession s) + { + _session = s; + } + + public AMQSession getAMQSession() + { + return _session; + } + + /** + * Get the AMQ message number assigned to this message + * + * @return the message number + */ + public long getDeliveryTag() + { + return _deliveryTag; + } + + + + + + + protected void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, Null Values. +// The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// Identifiers are case sensitive. +// Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } + + + public MessageProperties getMessageProperties() + { + return _messageProps; + } + + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProps; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java new file mode 100644 index 0000000000..4c20a44849 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -0,0 +1,567 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.message; + +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import java.util.Map; +import java.util.Collections; +import java.util.Enumeration; +import java.util.UUID; +import java.net.URISyntaxException; + +public class AMQMessageDelegate_0_8 implements AMQMessageDelegate +{ + private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + + public static final String JMS_TYPE = "x-jms-type"; + + + private boolean _readableProperties = false; + + private Destination _destination; + private JMSHeaderAdapter _headerAdapter; + private static final boolean STRICT_AMQP_COMPLIANCE = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); + + private ContentHeaderProperties _contentHeaderProperties; + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ + private AMQSession _session; + private final long _deliveryTag; + + protected AMQMessageDelegate_0_8() + { + this(new BasicContentHeaderProperties(), -1); + _readableProperties = false; + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + + } + + protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey) + { + this(contentHeader, deliveryTag); + + Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + + if(type == null) + { + type = AMQDestination.UNKNOWN_TYPE; + } + + AMQDestination dest; + + switch(type.intValue()) + { + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + } + + + + // Destination dest = AMQDestination.createDestination(url); + setJMSDestination(dest); + + + + } + + protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag) + { + _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; + _readableProperties = (_contentHeaderProperties != null); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + } + + + public String getJMSMessageID() throws JMSException + { + return getContentHeaderProperties().getMessageIdAsString(); + } + + public void setJMSMessageID(String messageId) throws JMSException + { + getContentHeaderProperties().setMessageId(messageId); + } + + public void setJMSMessageID(UUID messageId) throws JMSException + { + getContentHeaderProperties().setMessageId("ID:" + messageId); + } + + + public long getJMSTimestamp() throws JMSException + { + return getContentHeaderProperties().getTimestamp(); + } + + public void setJMSTimestamp(long timestamp) throws JMSException + { + getContentHeaderProperties().setTimestamp(timestamp); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + getContentHeaderProperties().setCorrelationId(new String(bytes)); + } + + public void setJMSCorrelationID(String correlationId) throws JMSException + { + getContentHeaderProperties().setCorrelationId(correlationId); + } + + public String getJMSCorrelationID() throws JMSException + { + return getContentHeaderProperties().getCorrelationIdAsString(); + } + + public Destination getJMSReplyTo() throws JMSException + { + String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); + if (replyToEncoding == null) + { + return null; + } + else + { + Destination dest = (Destination) _destinationCache.get(replyToEncoding); + if (dest == null) + { + try + { + BindingURL binding = new AMQBindingURL(replyToEncoding); + dest = AMQDestination.createDestination(binding); + } + catch (URISyntaxException e) + { + throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); + } + + _destinationCache.put(replyToEncoding, dest); + } + + return dest; + } + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + if (destination == null) + { + throw new IllegalArgumentException("Null destination not allowed"); + } + + if (!(destination instanceof AMQDestination)) + { + throw new IllegalArgumentException( + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + } + + final AMQDestination amqd = (AMQDestination) destination; + + final AMQShortString encodedDestination = amqd.getEncodedName(); + _destinationCache.put(encodedDestination, destination); + getContentHeaderProperties().setReplyTo(encodedDestination); + } + + public Destination getJMSDestination() throws JMSException + { + return _destination; + } + + public void setJMSDestination(Destination destination) + { + _destination = destination; + } + + public void setContentType(String contentType) + { + getContentHeaderProperties().setContentType(contentType); + } + + public String getContentType() + { + return getContentHeaderProperties().getContentTypeAsString(); + } + + public void setEncoding(String encoding) + { + getContentHeaderProperties().setEncoding(encoding); + } + + public String getEncoding() + { + return getContentHeaderProperties().getEncodingAsString(); + } + + public String getReplyToString() + { + return getContentHeaderProperties().getReplyToAsString(); + } + + public int getJMSDeliveryMode() throws JMSException + { + return getContentHeaderProperties().getDeliveryMode(); + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + getContentHeaderProperties().setDeliveryMode((byte) i); + } + + public BasicContentHeaderProperties getContentHeaderProperties() + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + + + public String getJMSType() throws JMSException + { + return getContentHeaderProperties().getTypeAsString(); + } + + public void setJMSType(String string) throws JMSException + { + getContentHeaderProperties().setType(string); + } + + public long getJMSExpiration() throws JMSException + { + return getContentHeaderProperties().getExpiration(); + } + + public void setJMSExpiration(long l) throws JMSException + { + getContentHeaderProperties().setExpiration(l); + } + + + + public boolean propertyExists(String propertyName) throws JMSException + { + return getJmsHeaders().propertyExists(propertyName); + } + + public boolean getBooleanProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBoolean(propertyName); + } + + public byte getByteProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getByte(propertyName); + } + + public short getShortProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getShort(propertyName); + } + + public int getIntProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getInteger(propertyName); + } + + public long getLongProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getLong(propertyName); + } + + public float getFloatProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getFloat(propertyName); + } + + public double getDoubleProperty(String propertyName) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getDouble(propertyName); + } + + public String getStringProperty(String propertyName) throws JMSException + { + //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. + if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) + { + return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); + } + else + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getString(propertyName); + } + } + + public Object getObjectProperty(String propertyName) throws JMSException + { + return getJmsHeaders().getObject(propertyName); + } + + public Enumeration getPropertyNames() throws JMSException + { + return getJmsHeaders().getPropertyNames(); + } + + public void setBooleanProperty(String propertyName, boolean b) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBoolean(propertyName, b); + } + + public void setByteProperty(String propertyName, byte b) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setByte(propertyName, new Byte(b)); + } + + public void setShortProperty(String propertyName, short i) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setShort(propertyName, new Short(i)); + } + + public void setIntProperty(String propertyName, int i) throws JMSException + { + checkWritableProperties(); + getJmsHeaders().setInteger(propertyName, new Integer(i)); + } + + public void setLongProperty(String propertyName, long l) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setLong(propertyName, new Long(l)); + } + + public void setFloatProperty(String propertyName, float f) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setFloat(propertyName, new Float(f)); + } + + public void setDoubleProperty(String propertyName, double v) throws JMSException + { + if (STRICT_AMQP_COMPLIANCE) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setDouble(propertyName, new Double(v)); + } + + public void setStringProperty(String propertyName, String value) throws JMSException + { + checkWritableProperties(); + getJmsHeaders().setString(propertyName, value); + } + + public void setObjectProperty(String propertyName, Object object) throws JMSException + { + checkWritableProperties(); + getJmsHeaders().setObject(propertyName, object); + } + + public void removeProperty(String propertyName) throws JMSException + { + getJmsHeaders().remove(propertyName); + } + + + private JMSHeaderAdapter getJmsHeaders() + { + return _headerAdapter; + } + + protected void checkWritableProperties() throws MessageNotWriteableException + { + if (_readableProperties) + { + throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); + } + _contentHeaderProperties.updated(); + } + + + public int getJMSPriority() throws JMSException + { + return getContentHeaderProperties().getPriority(); + } + + public void setJMSPriority(int i) throws JMSException + { + getContentHeaderProperties().setPriority((byte) i); + } + + public void clearProperties() throws JMSException + { + getJmsHeaders().clear(); + + _readableProperties = false; + } + + + public void acknowledgeThis() throws JMSException + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_session != null) + { + if (_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // received on the session + _session.acknowledgeMessage(_deliveryTag, true); + } + } + + public void acknowledge() throws JMSException + { + if (_session != null) + { + _session.acknowledge(); + } + } + + + /** + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls + * acknowledge() + * + * @param s the AMQ session that delivered this message + */ + public void setAMQSession(AMQSession s) + { + _session = s; + } + + public AMQSession getAMQSession() + { + return _session; + } + + /** + * Get the AMQ message number assigned to this message + * + * @return the message number + */ + public long getDeliveryTag() + { + return _deliveryTag; + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 94be090cf2..c0d51fa726 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -31,7 +31,6 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; /** * @author Apache Software Foundation @@ -44,21 +43,21 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage */ private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - AbstractBytesMessage() + AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory) { - this(null); + this(delegateFactory, null); } /** * Construct a bytes message with existing data. * + * @param delegateFactory * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand */ - AbstractBytesMessage(ByteBuffer data) + AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(data); // this instanties a content header - getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); + super(delegateFactory, data); // this instanties a content header + setContentType(getMimeType()); if (_data == null) { @@ -72,13 +71,12 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data.setAutoExpand(true); } - AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, contentHeader, exchange, routingKey, data); - getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); - } + AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + { + super(delegate, data); + setContentType(getMimeType()); + } + public void clearBodyImpl() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 9bdde01bf3..f0fc394b0b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -33,7 +33,6 @@ import javax.jms.MessageNotWriteableException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; /** @@ -70,27 +69,28 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage */ private int _byteArrayRemaining = -1; - AbstractBytesTypedMessage() + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory) { - this(null); + + this(delegateFactory, null); } /** * Construct a stream message with existing data. * + * @param delegateFactory * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand */ - AbstractBytesTypedMessage(ByteBuffer data) + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(data); // this instanties a content header - } + super(delegateFactory, data); // this instanties a content header + } - AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, exchange, routingKey, data); + + super(delegate, data); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index dc3a483538..54da7c4404 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -21,10 +21,7 @@ package org.apache.qpid.client.message; import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Collections; import java.util.Enumeration; -import java.util.Map; import java.util.UUID; import javax.jms.Destination; @@ -32,122 +29,54 @@ import javax.jms.JMSException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import org.apache.commons.collections.map.ReferenceMap; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQUndefinedDestination; -import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.BindingURL; -public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message +public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message { - private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); - public static final String JMS_TYPE = "x-jms-type"; - protected boolean _redelivered; protected ByteBuffer _data; - private boolean _readableProperties = false; protected boolean _readableMessage = false; protected boolean _changedData = true; - private Destination _destination; - private JMSHeaderAdapter _headerAdapter; - private static final boolean STRICT_AMQP_COMPLIANCE = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); + /** * This is 0_10 specific */ private org.apache.qpid.api.Message _010message = null; + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - public void set010Message(org.apache.qpid.api.Message m ) - { - _010message = m; - } - public void dataChanged() - { - if (_010message != null) - { - _010message.clearData(); - try - { - if (_data != null) - { - _010message.appendData(_data.buf().slice()); - } - else - { - _010message.appendData(java.nio.ByteBuffer.allocate(0)); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - /** - * End 010 specific - */ - public org.apache.qpid.api.Message get010Message() - { - return _010message; - } + private AMQMessageDelegate _delegate; + private boolean _redelivered; - protected AbstractJMSMessage(ByteBuffer data) + protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(new BasicContentHeaderProperties()); + _delegate = delegateFactory.createDelegate(); _data = data; if (_data != null) { _data.acquire(); } - _readableProperties = false; + _readableMessage = (data != null); _changedData = (data == null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); } - protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - this(contentHeader, deliveryTag); - - Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); - - AMQDestination dest; - if (AMQDestination.QUEUE_TYPE.equals(type)) - { - dest = new AMQQueue(exchange, routingKey, routingKey); - } - else if (AMQDestination.TOPIC_TYPE.equals(type)) - { - dest = new AMQTopic(exchange, routingKey, null); - } - else - { - dest = new AMQUndefinedDestination(exchange, routingKey, null); - } - // Destination dest = AMQDestination.createDestination(url); - setJMSDestination(dest); + _delegate = delegate; _data = data; if (_data != null) @@ -159,126 +88,82 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } - protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) + public String getJMSMessageID() throws JMSException { - super(contentHeader, deliveryTag); - _readableProperties = (_contentHeaderProperties != null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + return _delegate.getJMSMessageID(); } - public String getJMSMessageID() throws JMSException + public void setJMSMessageID(String messageId) throws JMSException { - return getContentHeaderProperties().getMessageIdAsString(); + _delegate.setJMSMessageID(messageId); } - public void setJMSMessageID(String messageId) throws JMSException + public void setJMSMessageID(UUID messageId) throws JMSException { - getContentHeaderProperties().setMessageId(messageId); + _delegate.setJMSMessageID(messageId); } + public long getJMSTimestamp() throws JMSException { - return getContentHeaderProperties().getTimestamp(); + return _delegate.getJMSTimestamp(); } public void setJMSTimestamp(long timestamp) throws JMSException { - getContentHeaderProperties().setTimestamp(timestamp); + _delegate.setJMSTimestamp(timestamp); } public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); + return _delegate.getJMSCorrelationIDAsBytes(); } public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException { - getContentHeaderProperties().setCorrelationId(new String(bytes)); + _delegate.setJMSCorrelationIDAsBytes(bytes); } public void setJMSCorrelationID(String correlationId) throws JMSException { - getContentHeaderProperties().setCorrelationId(correlationId); + _delegate.setJMSCorrelationID(correlationId); } public String getJMSCorrelationID() throws JMSException { - return getContentHeaderProperties().getCorrelationIdAsString(); + return _delegate.getJMSCorrelationID(); } public Destination getJMSReplyTo() throws JMSException { - String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); - if (replyToEncoding == null) - { - return null; - } - else - { - Destination dest = (Destination) _destinationCache.get(replyToEncoding); - if (dest == null) - { - try - { - BindingURL binding = new AMQBindingURL(replyToEncoding); - dest = AMQDestination.createDestination(binding); - } - catch (URISyntaxException e) - { - throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); - } - - _destinationCache.put(replyToEncoding, dest); - } - - return dest; - } + return _delegate.getJMSReplyTo(); } public void setJMSReplyTo(Destination destination) throws JMSException { - if (destination == null) - { - throw new IllegalArgumentException("Null destination not allowed"); - } - - if (!(destination instanceof AMQDestination)) - { - throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); - } - - final AMQDestination amqd = (AMQDestination) destination; - - final AMQShortString encodedDestination = amqd.getEncodedName(); - _destinationCache.put(encodedDestination, destination); - getContentHeaderProperties().setReplyTo(encodedDestination); + _delegate.setJMSReplyTo(destination); } public Destination getJMSDestination() throws JMSException { - return _destination; + return _delegate.getJMSDestination(); } public void setJMSDestination(Destination destination) { - _destination = destination; + _delegate.setJMSDestination(destination); } public int getJMSDeliveryMode() throws JMSException { - return getContentHeaderProperties().getDeliveryMode(); + return _delegate.getJMSDeliveryMode(); } public void setJMSDeliveryMode(int i) throws JMSException { - getContentHeaderProperties().setDeliveryMode((byte) i); + _delegate.setJMSDeliveryMode(i); } - public BasicContentHeaderProperties getContentHeaderProperties() - { - return (BasicContentHeaderProperties) _contentHeaderProperties; - } public boolean getJMSRedelivered() throws JMSException { @@ -290,318 +175,180 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _redelivered = b; } + public String getJMSType() throws JMSException { - return getContentHeaderProperties().getTypeAsString(); + return _delegate.getJMSType(); } public void setJMSType(String string) throws JMSException { - getContentHeaderProperties().setType(string); + _delegate.setJMSType(string); } public long getJMSExpiration() throws JMSException { - return getContentHeaderProperties().getExpiration(); + return _delegate.getJMSExpiration(); } public void setJMSExpiration(long l) throws JMSException { - getContentHeaderProperties().setExpiration(l); + _delegate.setJMSExpiration(l); } public int getJMSPriority() throws JMSException { - return getContentHeaderProperties().getPriority(); + return _delegate.getJMSPriority(); } public void setJMSPriority(int i) throws JMSException { - getContentHeaderProperties().setPriority((byte) i); + _delegate.setJMSPriority(i); } - public void clearProperties() throws JMSException - { - getJmsHeaders().clear(); - - _readableProperties = false; - } - - public void clearBody() throws JMSException - { - clearBodyImpl(); - _readableMessage = false; - } - - public boolean propertyExists(AMQShortString propertyName) throws JMSException - { - return getJmsHeaders().propertyExists(propertyName); - } public boolean propertyExists(String propertyName) throws JMSException { - return getJmsHeaders().propertyExists(propertyName); + return _delegate.propertyExists(propertyName); } - public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException + public boolean getBooleanProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getBoolean(propertyName); + return _delegate.getBooleanProperty(s); } - public boolean getBooleanProperty(String propertyName) throws JMSException + public byte getByteProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getBoolean(propertyName); + return _delegate.getByteProperty(s); } - public byte getByteProperty(String propertyName) throws JMSException + public short getShortProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getByte(propertyName); + return _delegate.getShortProperty(s); } - public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException + public int getIntProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getBytes(propertyName); + return _delegate.getIntProperty(s); } - public short getShortProperty(String propertyName) throws JMSException + public long getLongProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getShort(propertyName); - } - - public int getIntProperty(String propertyName) throws JMSException - { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getInteger(propertyName); + return _delegate.getLongProperty(s); } - public long getLongProperty(String propertyName) throws JMSException + public float getFloatProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getLong(propertyName); + return _delegate.getFloatProperty(s); } - public float getFloatProperty(String propertyName) throws JMSException + public double getDoubleProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getFloat(propertyName); + return _delegate.getDoubleProperty(s); } - public double getDoubleProperty(String propertyName) throws JMSException + public String getStringProperty(final String s) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getDouble(propertyName); + return _delegate.getStringProperty(s); } - public String getStringProperty(String propertyName) throws JMSException + public Object getObjectProperty(final String s) + throws JMSException { - //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. - if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) - { - return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); - } - else - { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - return getJmsHeaders().getString(propertyName); - } + return _delegate.getObjectProperty(s); } - public Object getObjectProperty(String propertyName) throws JMSException + public Enumeration getPropertyNames() + throws JMSException { - return getJmsHeaders().getObject(propertyName); + return _delegate.getPropertyNames(); } - public Enumeration getPropertyNames() throws JMSException + public void setBooleanProperty(final String s, final boolean b) + throws JMSException { - return getJmsHeaders().getPropertyNames(); + _delegate.setBooleanProperty(s, b); } - public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException + public void setByteProperty(final String s, final byte b) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setBoolean(propertyName, b); + _delegate.setByteProperty(s, b); } - public void setBooleanProperty(String propertyName, boolean b) throws JMSException + public void setShortProperty(final String s, final short i) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setBoolean(propertyName, b); + _delegate.setShortProperty(s, i); } - public void setByteProperty(String propertyName, byte b) throws JMSException + public void setIntProperty(final String s, final int i) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setByte(propertyName, new Byte(b)); + _delegate.setIntProperty(s, i); } - public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException + public void setLongProperty(final String s, final long l) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setBytes(propertyName, bytes); + _delegate.setLongProperty(s, l); } - public void setShortProperty(String propertyName, short i) throws JMSException + public void setFloatProperty(final String s, final float v) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setShort(propertyName, new Short(i)); + _delegate.setFloatProperty(s, v); } - public void setIntProperty(String propertyName, int i) throws JMSException + public void setDoubleProperty(final String s, final double v) + throws JMSException { - checkWritableProperties(); - JMSHeaderAdapter.checkPropertyName(propertyName); - super.setIntProperty(new AMQShortString(propertyName), new Integer(i)); + _delegate.setDoubleProperty(s, v); } - public void setLongProperty(String propertyName, long l) throws JMSException + public void setStringProperty(final String s, final String s1) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setLong(propertyName, new Long(l)); + _delegate.setStringProperty(s, s1); } - public void setFloatProperty(String propertyName, float f) throws JMSException + public void setObjectProperty(final String s, final Object o) + throws JMSException { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - - checkWritableProperties(); - getJmsHeaders().setFloat(propertyName, new Float(f)); + _delegate.setObjectProperty(s, o); } - public void setDoubleProperty(String propertyName, double v) throws JMSException - { - if (STRICT_AMQP_COMPLIANCE) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - checkWritableProperties(); - getJmsHeaders().setDouble(propertyName, new Double(v)); - } - public void setStringProperty(String propertyName, String value) throws JMSException + public void clearProperties() throws JMSException { - checkWritableProperties(); - JMSHeaderAdapter.checkPropertyName(propertyName); - super.setLongStringProperty(new AMQShortString(propertyName), value); + _delegate.clearProperties(); } - public void setObjectProperty(String propertyName, Object object) throws JMSException + public void clearBody() throws JMSException { - checkWritableProperties(); - getJmsHeaders().setObject(propertyName, object); - } + clearBodyImpl(); + _readableMessage = false; - protected void removeProperty(AMQShortString propertyName) throws JMSException - { - getJmsHeaders().remove(propertyName); } - protected void removeProperty(String propertyName) throws JMSException - { - getJmsHeaders().remove(propertyName); - } public void acknowledgeThis() throws JMSException { - // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge - // is not specified. In our case, we only set the session field where client acknowledge mode is specified. - if (_session != null) - { - if (_session.getAMQConnection().isClosed()) - { - throw new javax.jms.IllegalStateException("Connection is already closed"); - } - - // we set multiple to true here since acknowledgement implies acknowledge of all previous messages - // received on the session - _session.acknowledgeMessage(_deliveryTag, true); - } + _delegate.acknowledgeThis(); } public void acknowledge() throws JMSException { - if (_session != null) - { - _session.acknowledge(); - } + _delegate.acknowledge(); } /** @@ -617,12 +364,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach */ public abstract String toBodyString() throws JMSException; - public String getMimeType() - { - return getMimeTypeAsShortString().toString(); - } + protected abstract String getMimeType(); + - public abstract AMQShortString getMimeTypeAsShortString(); public String toString() { @@ -640,16 +384,23 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS Destination: ").append(getJMSDestination()); buf.append("\nJMS Type: ").append(getJMSType()); buf.append("\nJMS MessageID: ").append(getJMSMessageID()); - buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nAMQ message number: ").append(getDeliveryTag()); buf.append("\nProperties:"); - if (getJmsHeaders().isEmpty()) + final Enumeration propertyNames = getPropertyNames(); + if (!propertyNames.hasMoreElements()) { buf.append("<NONE>"); } else { - buf.append('\n').append(getJmsHeaders().getHeaders()); + buf.append('\n'); + while(propertyNames.hasMoreElements()) + { + String propertyName = (String) propertyNames.nextElement(); + buf.append(propertyName).append(":\t").append(getObjectProperty(propertyName)); + } + } return buf.toString(); @@ -660,14 +411,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) - { - getContentHeaderProperties().setHeaders(messageProperties); - } - public JMSHeaderAdapter getJmsHeaders() + public AMQMessageDelegate getDelegate() { - return _headerAdapter; + return _delegate; } public ByteBuffer getData() @@ -698,25 +445,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - protected void checkWritableProperties() throws MessageNotWriteableException - { - if (_readableProperties) - { - throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); - } - _contentHeaderProperties.updated(); - } - - public boolean isReadable() - { - return _readableMessage; - } - - public boolean isWritable() - { - return !_readableMessage; - } - public void reset() { if (!_changedData) @@ -731,6 +459,47 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } + public void set010Message(org.apache.qpid.api.Message m ) + { + _010message = m; + } + + public void dataChanged() + { + if (_010message != null) + { + _010message.clearData(); + try + { + if (_data != null) + { + _010message.appendData(_data.buf().slice()); + } + else + { + _010message.appendData(java.nio.ByteBuffer.allocate(0)); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + /** + * End 010 specific + */ + + public org.apache.qpid.api.Message get010Message() + { + return _010message; + } + + + + + public int getContentLength() { if(_data != null) @@ -748,4 +517,66 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _changedData = false; } + /** + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls + * acknowledge() + * + * @param s the AMQ session that delivered this message + */ + public void setAMQSession(AMQSession s) + { + _delegate.setAMQSession(s); + } + + public AMQSession getAMQSession() + { + return _delegate.getAMQSession(); + } + + /** + * Get the AMQ message number assigned to this message + * + * @return the message number + */ + public long getDeliveryTag() + { + return _delegate.getDeliveryTag(); + } + + /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */ + public void prepareForSending() throws JMSException + { + } + + + public void setContentType(String contentType) + { + _delegate.setContentType(contentType); + } + + public String getContentType() + { + return _delegate.getContentType(); + } + + public void setEncoding(String encoding) + { + _delegate.setEncoding(encoding); + } + + public String getEncoding() + { + return _delegate.getEncoding(); + } + + public String getReplyToString() + { + return _delegate.getReplyToString(); + } + + protected void removeProperty(final String propertyName) throws JMSException + { + _delegate.removeProperty(propertyName); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index e14c343613..5e16c765af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -38,17 +38,11 @@ import javax.jms.JMSException; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.UUID; public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); - protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange, - AMQShortString routingKey, - BasicContentHeaderProperties contentHeader) throws AMQException; - protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException @@ -105,13 +99,18 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory .remaining()); } - return createMessage(messageNbr, data, exchange, routingKey, - (BasicContentHeaderProperties) contentHeader.properties); + AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, + (BasicContentHeaderProperties) contentHeader.properties, + exchange, routingKey); + + return createMessage(delegate, data); } + protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException; + + protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader, - AMQShortString exchange, AMQShortString routingKey, - List bodies, String replyToURL) throws AMQException + List bodies) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); @@ -131,40 +130,13 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data .remaining()); } - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); // set the properties of this message MessageProperties mprop = (MessageProperties) contentHeader[0]; DeliveryProperties devprop = (DeliveryProperties) contentHeader[1]; - props.setContentType(mprop.getContentType()); - props.setCorrelationId(asString(mprop.getCorrelationId())); - String encoding = mprop.getContentEncoding(); - if (encoding != null && !encoding.equals("")) - { - props.setEncoding(encoding); - } - if (devprop.hasDeliveryMode()) - { - props.setDeliveryMode((byte) devprop.getDeliveryMode().getValue()); - } - props.setExpiration(devprop.getExpiration()); - UUID mid = mprop.getMessageId(); - props.setMessageId(mid == null ? null : "ID:" + mid.toString()); - if (devprop.hasPriority()) - { - props.setPriority((byte) devprop.getPriority().getValue()); - } - props.setReplyTo(replyToURL); - props.setTimestamp(devprop.getTimestamp()); - String type = null; - Map<String,Object> map = mprop.getApplicationHeaders(); - if (map != null) - { - type = (String) map.get(AbstractJMSMessage.JMS_TYPE); - } - props.setType(type); - props.setUserId(asString(mprop.getUserId())); - props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders())); - AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props); + + AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(mprop, devprop, messageNbr); + + AbstractJMSMessage message = createMessage(delegate, data); return message; } @@ -192,12 +164,11 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies, - String replyToURL) + List bodies) throws JMSException, AMQException { final AbstractJMSMessage msg = - create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL); + create010MessageWithBody(messageNbr, contentHeader, bodies); msg.setJMSRedelivered(redelivered); msg.receivedFromServer(); return msg; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 4f5641bcff..cd9d7ccf8b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -33,46 +33,47 @@ import javax.jms.MessageFormatException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { public static final String MIME_TYPE = "application/octet-stream"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - public JMSBytesMessage() + + public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory) { - this(null); + this(delegateFactory,null); + } /** * Construct a bytes message with existing data. * + * @param delegateFactory * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand */ - JMSBytesMessage(ByteBuffer data) + JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(data); // this instanties a content header + + super(delegateFactory, data); // this instanties a content header } - JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, exchange, routingKey, data); + super(delegate, data); } + public void reset() { super.reset(); _readableMessage = true; } - public AMQShortString getMimeTypeAsShortString() + protected String getMimeType() { - return MIME_TYPE_SHORT_STRING; + return MIME_TYPE; } public long getBodyLength() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index 0202dc29df..cb04ebee1b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -25,21 +25,18 @@ import javax.jms.JMSException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - BasicContentHeaderProperties contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data); + return new JMSBytesMessage(delegate, data); } - public AbstractJMSMessage createMessage() throws JMSException + public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - return new JMSBytesMessage(); + return new JMSBytesMessage(delegateFactory); } // 0_10 specific diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 8ec7437fa1..6215652c80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -467,7 +467,7 @@ public final class JMSHeaderAdapter return getPropertyNames(); } - protected static void checkPropertyName(CharSequence propertyName) + protected void checkPropertyName(CharSequence propertyName) { if (propertyName == null) { @@ -481,7 +481,7 @@ public final class JMSHeaderAdapter checkIdentiferFormat(propertyName); } - protected static void checkIdentiferFormat(CharSequence propertyName) + protected void checkIdentiferFormat(CharSequence propertyName) { // JMS requirements 3.5.1 Property Names // Identifiers: diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index fed1f1c609..b6e013ac8f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.slf4j.Logger; @@ -44,18 +43,19 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class); public static final String MIME_TYPE = "jms/map-message"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + private Map<String, Object> _map = new HashMap<String, Object>(); - public JMSMapMessage() throws JMSException + public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - this(null); + this(delegateFactory, null); } - JMSMapMessage(ByteBuffer data) throws JMSException + JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException { - super(data); // this instantiates a content header + + super(delegateFactory, data); // this instantiates a content header if(data != null) { populateMapFromData(); @@ -63,10 +63,10 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } - JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, - ByteBuffer data) throws AMQException + JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, exchange, routingKey, data); + + super(delegate, data); try { populateMapFromData(); @@ -79,14 +79,15 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } + public String toBodyString() throws JMSException { return _map == null ? "" : _map.toString(); } - public AMQShortString getMimeTypeAsShortString() + protected String getMimeType() { - return MIME_TYPE_SHORT_STRING; + return MIME_TYPE; } public ByteBuffer getData() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index 7cb8b637e6..eccb90560b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -25,21 +25,18 @@ import javax.jms.JMSException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSMapMessageFactory extends AbstractJMSMessageFactory { - public AbstractJMSMessage createMessage() throws JMSException + public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - return new JMSMapMessage(); + return new JMSMapMessage(delegateFactory); } - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - BasicContentHeaderProperties contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data); + return new JMSMapMessage(delegate, data); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 385eee47c9..0c431d42e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -37,52 +37,54 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { public static final String MIME_TYPE = "application/java-object-stream"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + private static final int DEFAULT_BUFFER_SIZE = 1024; /** * Creates empty, writable message for use by producers + * @param delegateFactory */ - public JMSObjectMessage() + public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory) { - this(null); + this(delegateFactory, null); } - private JMSObjectMessage(ByteBuffer data) + private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(data); + super(delegateFactory, data); if (data == null) { _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } - getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); + setContentType(getMimeType()); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, - ByteBuffer data) throws AMQException - { - super(messageNbr, contentHeader, exchange, routingKey, data); - } + + JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + { + super(delegate, data); + } + public void clearBodyImpl() throws JMSException { if (_data != null) { _data.release(); + _data = null; } - _data = null; + } @@ -91,9 +93,9 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag return toString(_data); } - public AMQShortString getMimeTypeAsShortString() + public String getMimeType() { - return MIME_TYPE_SHORT_STRING; + return MIME_TYPE; } public void setObject(Serializable serializable) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index e7369dcb26..03851dfa01 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -25,20 +25,17 @@ import javax.jms.JMSException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - BasicContentHeaderProperties contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data); + return new JMSObjectMessage(delegate, data); } - public AbstractJMSMessage createMessage() throws JMSException + public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - return new JMSObjectMessage(); + return new JMSObjectMessage(delegateFactory); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 62f3150ed1..ad2620852b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -26,7 +26,6 @@ import javax.jms.StreamMessage; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; /** @@ -35,7 +34,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage { public static final String MIME_TYPE="jms/stream-message"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + /** @@ -44,38 +43,40 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea */ private int _byteArrayRemaining = -1; - public JMSStreamMessage() + public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory) { - this(null); + this(delegateFactory,null); + } /** * Construct a stream message with existing data. * + * @param delegateFactory * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand */ - JMSStreamMessage(ByteBuffer data) + JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(data); // this instanties a content header - } + super(delegateFactory, data); // this instanties a content header + } - JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(messageNbr, contentHeader, exchange, routingKey, data); + + super(delegate, data); } + public void reset() { super.reset(); _readableMessage = true; } - public AMQShortString getMimeTypeAsShortString() + protected String getMimeType() { - return MIME_TYPE_SHORT_STRING; + return MIME_TYPE; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index 4bb648e090..5e25db9ae0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -25,19 +25,16 @@ import javax.jms.JMSException; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - BasicContentHeaderProperties contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data); + return new JMSStreamMessage(delegate, data); } - public AbstractJMSMessage createMessage() throws JMSException + public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - return new JMSStreamMessage(); + return new JMSStreamMessage(delegateFactory); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index b29e39a52e..470439d85c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -36,46 +36,44 @@ import org.apache.qpid.util.Strings; public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { private static final String MIME_TYPE = "text/plain"; - private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); - private String _decodedValue; /** * This constant represents the name of a property that is set when the message payload is null. */ - private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName(); + private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString(); private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); - public JMSTextMessage() throws JMSException + public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - this(null, null); + this(delegateFactory, null, null); } - JMSTextMessage(ByteBuffer data, String encoding) throws JMSException + JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException { - super(data); // this instantiates a content header - getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); - getContentHeaderProperties().setEncoding(encoding); + super(delegateFactory, data); // this instantiates a content header + setContentType(getMimeType()); + setEncoding(encoding); } - JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) + JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(deliveryTag, contentHeader, exchange, routingKey, data); - contentHeader.setContentType(MIME_TYPE_SHORT_STRING); + super(delegate, data); + setContentType(getMimeType()); _data = data; } - JMSTextMessage(ByteBuffer data) throws JMSException + + JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException { - this(data, null); + this(delegateFactory, data, null); } - JMSTextMessage(String text) throws JMSException + JMSTextMessage(AMQMessageDelegateFactory delegateFactory, String text) throws JMSException { - super((ByteBuffer) null); + super(delegateFactory, (ByteBuffer) null); setText(text); } @@ -84,8 +82,9 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text if (_data != null) { _data.release(); + _data = null; } - _data = null; + _decodedValue = null; } @@ -94,14 +93,9 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text return getText(); } - public void setData(ByteBuffer data) + protected String getMimeType() { - _data = data; - } - - public AMQShortString getMimeTypeAsShortString() - { - return MIME_TYPE_SHORT_STRING; + return MIME_TYPE; } public void setText(String text) throws JMSException @@ -113,7 +107,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { if (text != null) { - final String encoding = getContentHeaderProperties().getEncodingAsString(); + final String encoding = getEncoding(); if (encoding == null || encoding.equalsIgnoreCase("UTF-8")) { _data = ByteBuffer.wrap(Strings.toUTF8(text)); @@ -154,11 +148,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { return null; } - if (getContentHeaderProperties().getEncodingAsString() != null) + if (getEncoding() != null) { try { - _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); + _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder()); } catch (CharacterCodingException e) { @@ -197,4 +191,6 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text removeProperty(PAYLOAD_NULL_PROPERTY); } } + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index c578c15a6a..1f4d64c78f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -26,21 +26,17 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; public class JMSTextMessageFactory extends AbstractJMSMessageFactory { - public AbstractJMSMessage createMessage() throws JMSException + public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException { - return new JMSTextMessage(); + return new JMSTextMessage(delegateFactory); } - protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, - AMQShortString exchange, AMQShortString routingKey, - BasicContentHeaderProperties contentHeader) throws AMQException + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - return new JMSTextMessage(deliveryTag, contentHeader, - exchange, routingKey, data); + return new JMSTextMessage(delegate, data); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java index f6b11c6f6c..e606ef11c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java @@ -22,15 +22,9 @@ package org.apache.qpid.client.message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQSession; -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageEOFException; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; +import javax.jms.*; import java.util.Enumeration; @@ -52,12 +46,11 @@ public class MessageConverter _newMessage = message; } - public MessageConverter(BytesMessage message) throws JMSException + public MessageConverter(AMQSession session, BytesMessage bytesMessage) throws JMSException { - BytesMessage bytesMessage = (BytesMessage) message; bytesMessage.reset(); - JMSBytesMessage nativeMsg = new JMSBytesMessage(); + JMSBytesMessage nativeMsg = (JMSBytesMessage) session.createBytesMessage(); byte[] buf = new byte[1024]; @@ -69,12 +62,12 @@ public class MessageConverter } _newMessage = nativeMsg; - setMessageProperties(message); + setMessageProperties(bytesMessage); } - public MessageConverter(MapMessage message) throws JMSException + public MessageConverter(AMQSession session, MapMessage message) throws JMSException { - MapMessage nativeMessage = new JMSMapMessage(); + MapMessage nativeMessage = session.createMapMessage(); Enumeration mapNames = message.getMapNames(); while (mapNames.hasMoreElements()) @@ -87,21 +80,21 @@ public class MessageConverter setMessageProperties(message); } - public MessageConverter(ObjectMessage message) throws JMSException + public MessageConverter(AMQSession session, ObjectMessage origMessage) throws JMSException { - ObjectMessage origMessage = (ObjectMessage) message; - ObjectMessage nativeMessage = new JMSObjectMessage(); + + ObjectMessage nativeMessage = session.createObjectMessage(); nativeMessage.setObject(origMessage.getObject()); _newMessage = (AbstractJMSMessage) nativeMessage; - setMessageProperties(message); + setMessageProperties(origMessage); } - public MessageConverter(TextMessage message) throws JMSException + public MessageConverter(AMQSession session, TextMessage message) throws JMSException { - TextMessage nativeMessage = new JMSTextMessage(); + TextMessage nativeMessage = session.createTextMessage(); nativeMessage.setText(message.getText()); @@ -109,9 +102,9 @@ public class MessageConverter setMessageProperties(message); } - public MessageConverter(StreamMessage message) throws JMSException + public MessageConverter(AMQSession session, StreamMessage message) throws JMSException { - StreamMessage nativeMessage = new JMSStreamMessage(); + StreamMessage nativeMessage = session.createStreamMessage(); try { @@ -130,11 +123,11 @@ public class MessageConverter setMessageProperties(message); } - public MessageConverter(Message message) throws JMSException + public MessageConverter(AMQSession session, Message message) throws JMSException { // Send a message with just properties. // Throwing away content - BytesMessage nativeMessage = new JMSBytesMessage(); + Message nativeMessage = session.createMessage(); _newMessage = (AbstractJMSMessage) nativeMessage; setMessageProperties(message); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 6c9acdef01..424dccc0c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -39,10 +39,9 @@ public interface MessageFactory throws JMSException, AMQException; AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, - Struct[] contentHeader, - AMQShortString exchange, AMQShortString routingKey, - List bodies, String replyToURL) + Struct[] contentHeader, + List bodies) throws JMSException, AMQException; - AbstractJMSMessage createMessage() throws JMSException; + AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 6117b18fde..213d3ebc9e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -31,7 +31,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,8 +91,7 @@ public class MessageFactoryRegistry * @param deliveryTag the AMQ message id * @param redelivered true if redelivered * @param contentHeader the content header that was received - * @param bodies a list of ContentBody instances - * @return the message. + * @param bodies a list of ContentBody instances @return the message. * @throws AMQException * @throws JMSException */ @@ -120,9 +118,8 @@ public class MessageFactoryRegistry } } - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, - AMQShortString routingKey, Struct[] contentHeader, List bodies, - String replyTo) throws AMQException, JMSException + public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + Struct[] contentHeader, List bodies) throws AMQException, JMSException { MessageProperties mprop = (MessageProperties) contentHeader[0]; String messageType = mprop.getContentType(); @@ -138,12 +135,12 @@ public class MessageFactoryRegistry } else { - return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo); + return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies); } } - public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException + public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory, String mimeType) throws AMQException, JMSException { if (mimeType == null) { @@ -157,7 +154,7 @@ public class MessageFactoryRegistry } else { - return mf.createMessage(); + return mf.createMessage(delegateFactory); } } } diff --git a/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java b/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java index 700c6a15ac..2c05f5ce0f 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java +++ b/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java @@ -17,10 +17,9 @@ */ package org.apache.qpid.filter; -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.QpidException; +import org.apache.qpid.ErrorCode; import org.slf4j.LoggerFactory; import org.slf4j.Logger; @@ -54,20 +53,7 @@ public class PropertyExpression implements Expression { public Object evaluate(AbstractJMSMessage message) { - try - { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString replyTo = _properties.getReplyTo(); - - return (replyTo == null) ? null : replyTo.toString(); - } - catch (Exception e) - { - _logger.warn("Error evaluating property", e); - - return null; - } + return message.getReplyToString(); } }); @@ -77,13 +63,9 @@ public class PropertyExpression implements Expression { try { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString type = _properties.getType(); - - return (type == null) ? null : type.toString(); + return message.getJMSType(); } - catch (Exception e) + catch (JMSException e) { _logger.warn("Error evaluating property", e); @@ -107,7 +89,7 @@ public class PropertyExpression implements Expression return mode; } - catch (Exception e) + catch (JMSException e) { _logger.warn("Error evaluating property",e); } @@ -122,9 +104,7 @@ public class PropertyExpression implements Expression { try { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - return (int) _properties.getPriority(); + return message.getJMSPriority(); } catch (Exception e) { @@ -142,13 +122,9 @@ public class PropertyExpression implements Expression try { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString messageId = _properties.getMessageId(); - - return (messageId == null) ? null : messageId; + return message.getJMSMessageID(); } - catch (Exception e) + catch (JMSException e) { _logger.warn("Error evaluating property",e); @@ -164,9 +140,7 @@ public class PropertyExpression implements Expression { try { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - return _properties.getTimestamp(); + return message.getJMSTimestamp(); } catch (Exception e) { @@ -185,12 +159,9 @@ public class PropertyExpression implements Expression try { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - AMQShortString correlationId = _properties.getCorrelationId(); - return (correlationId == null) ? null : correlationId.toString(); + return message.getJMSCorrelationID(); } - catch (Exception e) + catch (JMSException e) { _logger.warn("Error evaluating property",e); @@ -207,11 +178,9 @@ public class PropertyExpression implements Expression try { - CommonContentHeaderProperties _properties = - message.getContentHeaderProperties(); - return _properties.getExpiration(); + return message.getJMSExpiration(); } - catch (Exception e) + catch (JMSException e) { _logger.warn("Error evaluating property",e); return null; @@ -257,13 +226,20 @@ public class PropertyExpression implements Expression else { - CommonContentHeaderProperties _properties = message.getContentHeaderProperties(); - if (_logger.isDebugEnabled()) + try + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Looking up property:" + name); + _logger.debug("Properties are:" + message.getPropertyNames()); + } + return message.getObjectProperty(name); + } + catch(JMSException e) { - _logger.debug("Looking up property:" + name); - _logger.debug("Properties are:" + _properties.getHeaders().keySet()); + throw new QpidException("Exception evaluating properties for filter", ErrorCode.INTERNAL_ERROR, e); } - return _properties.getHeaders().getObject(name); } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java index e65f9ad2f4..53c615a1fd 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/Message.java +++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java @@ -24,5 +24,7 @@ import javax.jms.JMSException; public interface Message extends javax.jms.Message { + public static final String JMS_TYPE = "x-jms-type"; + public void acknowledgeThis() throws JMSException; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java index 8973127105..3b10c44d5a 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java @@ -30,6 +30,12 @@ public class ByteBufferMessage implements Message private int _transferId; private Header _header; + public ByteBufferMessage(MessageProperties messageProperties, DeliveryProperties deliveryProperties) + { + _currentMessageProps = messageProperties; + _currentDeliveryProps = deliveryProperties; + } + public void setHeader(Header header) { _header = header; } diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java index 9b477c19e2..7ee991b63c 100644 --- a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java +++ b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java @@ -26,21 +26,21 @@ public class TestMessageHelper { public static JMSTextMessage newJMSTextMessage() throws JMSException { - return new JMSTextMessage(); + return new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8); } public static JMSBytesMessage newJMSBytesMessage() throws JMSException { - return new JMSBytesMessage(); + return new JMSBytesMessage(AMQMessageDelegateFactory.FACTORY_0_8); } public static JMSMapMessage newJMSMapMessage() throws JMSException { - return new JMSMapMessage(); + return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_8); } public static JMSStreamMessage newJMSStreamMessage() { - return new JMSStreamMessage(); + return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java index fd425b9930..b5e7ae82b5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.test.unit.message; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.TextMessage; +import javax.jms.*; import junit.framework.TestCase; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSMapMessage; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.*; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.message.*; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; + +import java.util.Map; public class MessageConverterTest extends TestCase @@ -47,36 +47,38 @@ public class MessageConverterTest extends TestCase protected JMSTextMessage testTextMessage; protected JMSMapMessage testMapMessage; + private AMQSession _session = new TestAMQSession(); + protected void setUp() throws Exception { super.setUp(); - testTextMessage = new JMSTextMessage(); + testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8); //Set Message Text testTextMessage.setText("testTextMessage text"); setMessageProperties(testTextMessage); - testMapMessage = new JMSMapMessage(); + testMapMessage = new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_8); testMapMessage.setString("testMapString", "testMapStringValue"); testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE); } public void testSetProperties() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage(); + AbstractJMSMessage newMessage = new MessageConverter(_session, (TextMessage) testTextMessage).getConvertedMessage(); mesagePropertiesTest(testTextMessage, newMessage); } public void testJMSTextMessageConversion() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage(); + AbstractJMSMessage newMessage = new MessageConverter(_session, (TextMessage) testTextMessage).getConvertedMessage(); assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText()); } public void testJMSMapMessageConversion() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage(); + AbstractJMSMessage newMessage = new MessageConverter(_session, (MapMessage) testMapMessage).getConvertedMessage(); assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"), testMapMessage.getString("testMapString")); assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"), diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java new file mode 100644 index 0000000000..dfb356432e --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -0,0 +1,171 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import org.apache.qpid.client.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; + +import javax.jms.*; +import java.util.Map; + +public class TestAMQSession extends AMQSession +{ + + public TestAMQSession() + { + super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0); + } + + public void acknowledgeMessage(long deliveryTag, boolean multiple) + { + + } + + public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException + { + + } + + public void sendClose(long timeout) throws AMQException, FailoverException + { + + } + + public void sendCommit() throws AMQException, FailoverException + { + + } + + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + return null; + } + + public void sendCreateQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException + { + + } + + public TemporaryQueue createTemporaryQueue() throws JMSException + { + return null; + } + + protected void sendRecover() throws AMQException, FailoverException + { + + } + + public void rejectMessage(long deliveryTag, boolean requeue) + { + + } + + public void releaseForRollback() + { + + } + + public void sendRollback() throws AMQException, FailoverException + { + + } + + public BasicMessageConsumer createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException + { + return null; + } + + public boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException + { + return false; + } + + public boolean isQueueBound(AMQDestination destination) throws JMSException + { + return false; + } + + public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException + { + + } + + public BasicMessageProducer createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId) + { + return null; + } + + protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException + { + return null; + } + + public void sendExchangeDeclare(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException + { + + } + + public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + { + + } + + public void sendQueueDelete(AMQShortString queueName) throws AMQException, FailoverException + { + + } + + public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException + { + + } + + protected boolean tagLE(long tag1, long tag2) + { + return false; + } + + protected boolean updateRollbackMark(long current, long deliveryTag) + { + return false; + } + + public AMQMessageDelegateFactory getMessageDelegateFactory() + { + return AMQMessageDelegateFactory.FACTORY_0_8; + } + + protected Object getFailoverMutex() + { + return this; + } + + public void checkNotClosed() + { + + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java index 64ccb719b6..3ad6c021bd 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -29,7 +29,6 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.BytesMessage; import javax.jms.TextMessage; -import javax.jms.Queue; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -42,10 +41,10 @@ public class TestMessageFactory return session.createTextMessage(createMessagePayload(size)); } - public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException + public static TextMessage newJMSTextMessage(Session session, int size, String encoding) throws JMSException { - ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true); - JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding); + + TextMessage message = session.createTextMessage(); message.clearBody(); message.setText(createMessagePayload(size)); return message; diff --git a/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java b/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java index 60a26c8e62..857adaf82c 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java +++ b/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java @@ -26,20 +26,22 @@ import java.util.Enumeration; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.ObjectMessage; +import javax.jms.Session; public class NonQpidObjectMessage implements ObjectMessage { - private JMSObjectMessage _realMessage; + private ObjectMessage _realMessage; private String _contentString; /** * Allows us to construct a JMS message which * does not inherit from the Qpid message superclasses * and expand our unit testing of MessageConverter et al + * @param session */ - public NonQpidObjectMessage() + public NonQpidObjectMessage(Session session) throws JMSException { - _realMessage = new JMSObjectMessage(); + _realMessage = session.createObjectMessage(); } public String getJMSMessageID() throws JMSException { diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 98639f6970..aafddb810a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -33,7 +33,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -105,7 +105,7 @@ public class TxAckTest extends TestCase Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { - TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), + TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), _storeContext, null, new LinkedList<RequiredDeliveryException>() ); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index afa0f84d71..08f78a3d28 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -34,7 +34,7 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -56,7 +56,7 @@ public class AckTest extends TestCase private MockProtocolSession _protocolSession; - private TestableMemoryMessageStore _messageStore; + private TestMemoryMessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); @@ -74,7 +74,7 @@ public class AckTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _messageStore = new TestableMemoryMessageStore(); + _messageStore = new TestMemoryMessageStore(); _protocolSession = new MockProtocolSession(_messageStore); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 79d428fee8..4e48435962 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -31,9 +31,9 @@ import java.util.List; /** * Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore extends MemoryMessageStore +public class TestMemoryMessageStore extends MemoryMessageStore { - public TestableMemoryMessageStore() + public TestMemoryMessageStore() { _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java index f36e924890..2346660d25 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -29,14 +29,13 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.AMQMessageHandle; -import org.apache.qpid.server.txn.NonTransactionalContext; /** * Tests that reference counting works correctly with AMQMessage and the message store */ public class TestReferenceCounting extends TestCase { - private TestableMemoryMessageStore _store; + private TestMemoryMessageStore _store; private StoreContext _storeContext = new StoreContext(); @@ -44,7 +43,7 @@ public class TestReferenceCounting extends TestCase protected void setUp() throws Exception { super.setUp(); - _store = new TestableMemoryMessageStore(); + _store = new TestMemoryMessageStore(); } /** diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java index b34f28b1a8..84d3d313d1 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.txn; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; @@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase } } - class MockStore extends TestableMemoryMessageStore + class MockStore extends TestMemoryMessageStore { final Object BEGIN = "BEGIN"; final Object ABORT = "ABORT"; diff --git a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 6864b0a80d..e69de29bb2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -1,131 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.ManagedObjectRegistry; -import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; -import org.apache.qpid.server.security.access.ACLPlugin; -import org.apache.qpid.server.security.access.plugins.AllowAll; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.MapConfiguration; - -import java.util.HashMap; -import java.util.Collection; -import java.util.Properties; - -public class TestApplicationRegistry extends ApplicationRegistry -{ - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - - private MessageStore _messageStore; - - private VirtualHost _vHost; - - public TestApplicationRegistry() - { - super(new MapConfiguration(new HashMap())); - } - - public void initialise() throws Exception - { - _logger.info("Initialising TestApplicationRegistry"); - - Properties users = new Properties(); - - users.put("guest", "guest"); - - _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); - - _accessManager = new AllowAll(); - - _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); - - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _managedObjectRegistry = appRegistry.getManagedObjectRegistry(); - _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _queueRegistry = _vHost.getQueueRegistry(); - _exchangeFactory = _vHost.getExchangeFactory(); - _exchangeRegistry = _vHost.getExchangeRegistry(); - - _messageStore = new TestableMemoryMessageStore(); - - _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes - } - - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - public Collection<String> getVirtualHostNames() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setAccessManager(ACLPlugin newManager) - { - _accessManager = newManager; - } - - public MessageStore getMessageStore() - { - return _messageStore; - } - - public PluginManager getPluginManager() - { - return null; - } -} - - diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index 10705119e7..d2965bd52a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -114,9 +114,14 @@ public class ObjectMessageTest extends QpidTestCase implements MessageListener { synchronized (received) { + long endTime = System.currentTimeMillis() + 30000L; while (received.size() < count) { - received.wait(); + received.wait(30000); + if(received.size() < count && System.currentTimeMillis() > endTime) + { + throw new RuntimeException("Only received " + received.size() + " messages, was expecting " + count); + } } } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index ca896b08bb..d9390c33df 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -25,10 +25,12 @@ import junit.framework.Assert; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +47,7 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.net.URISyntaxException; public class PropertyValueTest extends QpidTestCase implements MessageListener { @@ -183,43 +186,6 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener m.setShortProperty("Short", (short) Short.MAX_VALUE); m.setStringProperty("String", "Test"); - // AMQP Specific values - - // Timestamp - long nano = System.nanoTime(); - m.setStringProperty("time-str", String.valueOf(nano)); - ((AMQMessage) m).setTimestampProperty(new AMQShortString("time"), nano); - - // Decimal - BigDecimal bd = new BigDecimal(Integer.MAX_VALUE); - ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal"), bd.setScale(Byte.MAX_VALUE)); - - bd = new BigDecimal((long) Integer.MAX_VALUE + 1L); - - try - { - ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-value"), bd.setScale(Byte.MAX_VALUE)); - fail("UnsupportedOperationException should be thrown as value can't be correctly transmitted"); - } - catch (UnsupportedOperationException uoe) - { - // normal path. - } - - try - { - ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-scale"), - bd.setScale(Byte.MAX_VALUE + 1)); - fail("UnsupportedOperationException should be thrown as scale can't be correctly transmitted"); - } - catch (UnsupportedOperationException uoe) - { - // normal path. - } - - // Void - ((AMQMessage) m).setVoidProperty(new AMQShortString("void")); - _logger.debug("Sending Msg:" + m); producer.send(m); } @@ -236,7 +202,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener } } - void check() throws JMSException + void check() throws JMSException, URISyntaxException { List<String> actual = new ArrayList<String>(); for (JMSTextMessage m : received) @@ -259,8 +225,8 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority()); // Queue - Assert.assertEquals("Check ReplyTo properties are correctly transported", m.getStringProperty("TempQueue"), - m.getJMSReplyTo().toString()); + Assert.assertEquals("Check ReplyTo properties are correctly transported", AMQDestination.createDestination(new AMQBindingURL(m.getStringProperty("TempQueue"))), + m.getJMSReplyTo()); Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType()); @@ -271,7 +237,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE, m.getLongProperty("Long")); Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); - +/* // AMQP Tests Specific values Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"), @@ -288,7 +254,7 @@ public class PropertyValueTest extends QpidTestCase implements MessageListener Assert.assertTrue("Check void properties are correctly transported", ((AMQMessage) m).getPropertyHeaders().containsKey("void")); - +*/ //JMSXUserID if (m.getStringProperty("JMSXUserID") != null) { diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 5bf99e719e..1f90f1e29f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -76,7 +76,7 @@ public class JMSPropertiesTest extends QpidTestCase MessageProducer producer = producerSession.createProducer(queue); Destination JMS_REPLY_TO = new AMQQueue(con2, "my.replyto"); // create a test message to send - ObjectMessage sentMsg = new NonQpidObjectMessage(); + ObjectMessage sentMsg = new NonQpidObjectMessage(producerSession); sentMsg.setJMSCorrelationID(JMS_CORR_ID); sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE); sentMsg.setJMSType(JMS_TYPE); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 3027da00c7..6fa0172ae3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -106,7 +106,8 @@ public class StreamMessageTest extends QpidTestCase _logger.info("Starting consumer connection"); con.start(); - StreamMessage msg2 = (StreamMessage) consumer.receive(); + StreamMessage msg2 = (StreamMessage) consumer.receive(2000); + assertNotNull(msg2); msg2.readByte(); try |