summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-09-13 12:35:48 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-09-13 12:35:48 +0000
commite00a1cfa3881e3bb8aadfecdf502f17903e319b1 (patch)
tree3571f0cf7a40bcd62271ced73a6ef385ddc46f9c
parentc9a48638e074b5c5db13ddd49dea03f0895baf5f (diff)
downloadqpid-python-e00a1cfa3881e3bb8aadfecdf502f17903e319b1.tar.gz
updated message hierarchy for using 0_10 messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575289 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java188
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java104
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java80
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java69
20 files changed, 317 insertions, 197 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index e4318f4c0d..aa6756d116 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -112,7 +112,7 @@ public class AMQSession_0_10 extends AMQSession
int defaultPrefetchLow)
{
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault010Registry(),
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
defaultPrefetchHigh, defaultPrefetchLow);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 025054e710..180a1e663c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -100,7 +100,7 @@ public class AMQSession_0_8 extends AMQSession
AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
{
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault08Registry(), defaultPrefetchHigh,
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index a11744a6d5..412c7e9a8a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -39,18 +39,18 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
- /** The connection being used by this consumer */
+ /**
+ * The connection being used by this consumer
+ */
private AMQConnection _connection;
private String _messageSelector;
@@ -59,15 +59,23 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
private AMQDestination _destination;
- /** When true indicates that a blocking receive call is in progress */
+ /**
+ * When true indicates that a blocking receive call is in progress
+ */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
- /** Holds an atomic reference to the listener installed. */
+ /**
+ * Holds an atomic reference to the listener installed.
+ */
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
- /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
+ /**
+ * The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker
+ */
protected AMQShortString _consumerTag;
- /** We need to know the channel id when constructing frames */
+ /**
+ * We need to know the channel id when constructing frames
+ */
protected int _channelId;
/**
@@ -82,7 +90,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
protected AMQProtocolHandler _protocolHandler;
- /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
+ /**
+ * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
+ */
private FieldTable _rawSelectorFieldTable;
/**
@@ -97,7 +107,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
*/
private int _prefetchLow;
- /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
private boolean _exclusive;
/**
@@ -107,7 +119,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
*/
private int _acknowledgeMode;
- /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
+ /**
+ * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ */
private int _outstanding;
/**
@@ -118,7 +132,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+ /**
+ * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
+ */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
/**
@@ -138,9 +154,10 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
+ AMQSession session, AMQProtocolHandler protocolHandler,
+ FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -216,8 +233,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
if (_logger.isDebugEnabled())
{
- _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
- + _destination);
+ _logger.debug(
+ "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
}
}
else
@@ -418,9 +435,7 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
* it by throwing it (if an exception) or returning it (in any other case).
*
* @param o
- *
* @return a message only if o is a Message
- *
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
* JMSException is created with the linked exception set appropriately
*/
@@ -465,8 +480,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " close():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace())
+ .subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -518,8 +533,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " markClosed():" + Arrays
+ .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -588,14 +603,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
- {
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+ throws Exception;
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
-
- }
/**
* @param jmsMessage this message has already been processed so can't redo preDeliver
@@ -643,77 +653,79 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
{
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
+ if (_dups_ok_acknowledge_send)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
}
- }
- break;
+ break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
- break;
+ break;
}
}
- /** Acknowledge up to last message delivered (if any). Used when commiting. */
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ */
void acknowledgeLastDelivered()
{
if (!_receivedDeliveryTags.isEmpty())
@@ -740,8 +752,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " notifyError():" + Arrays
+ .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -819,7 +831,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
}
}
- /** Called on recovery to reset the list of delivery tags */
+ /**
+ * Called on recovery to reset the list of delivery tags
+ */
public void clearUnackedMessages()
{
_unacknowledgedDeliveryTags.clear();
@@ -860,8 +874,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
- + "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
+ .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -890,8 +904,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
- + "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue
+ .size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
@@ -914,8 +928,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes
}
else
{
- _logger.error("Queue contained a :" + o.getClass()
- + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ _logger.error("Queue contained a :" + o
+ .getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index f2019ea43e..ec27fdbb71 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -32,11 +32,12 @@ import org.apache.qpidity.Struct;
import javax.jms.JMSException;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
* This is a 0.10 message consumer.
*/
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.client.util.MessageListener
{
/**
@@ -108,5 +109,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
super.postDeliver(msg);
}
-
+
+
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ {
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ }
} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 80985daea4..c82ed96e4c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -75,4 +75,13 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
+
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ {
+
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+ }
} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 435efdf2bd..8ecb5ffd78 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -21,19 +21,19 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpidity.jms.message.MessageImpl;
-import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.jms.ExceptionHelper;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.apache.qpidity.ReplyTo;
import javax.jms.Message;
import javax.jms.JMSException;
-import java.util.UUID;
import java.io.IOException;
/**
- *
- * This is a 0_10 message producer.
+ * This is a 0_10 message producer.
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
@@ -66,85 +66,79 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
//--- Overwritten methods
-
/**
* Sends a message to a given destination
- * We will always convert the received message
*/
public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
boolean wait) throws JMSException
{
- // Only get current time if required
- long currentTime = Long.MIN_VALUE;
- if (!((timeToLive == 0) && _disableTimestamps))
+ message.prepareForSending();
+ org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage();
+ // set the payload
+ try
{
- currentTime = System.currentTimeMillis();
+ qpidityMessage.appendData(message.getData().buf());
}
- // the messae UID
- String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
- MessageImpl qpidMessage;
- // check that the message is not a foreign one
- try
+ catch (IOException e)
{
- qpidMessage = (MessageImpl) origMessage;
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- catch (ClassCastException cce)
+ // set the delivery properties
+ if (!_disableTimestamps)
{
- // this is a foreign message
- qpidMessage = MessageHelper.transformMessage(origMessage);
- // set message's properties in case they are queried after send.
- origMessage.setJMSDestination(destination);
- origMessage.setJMSDeliveryMode(deliveryMode);
- origMessage.setJMSPriority(priority);
- origMessage.setJMSMessageID(uid);
- if (timeToLive != 0)
+ final long currentTime = System.currentTimeMillis();
+ qpidityMessage.getDeliveryProperties().setTimestamp(currentTime);
+ if (timeToLive > 0)
{
- origMessage.setJMSExpiration(timeToLive + currentTime);
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ qpidityMessage.getDeliveryProperties().setExpiration(currentTime + timeToLive);
}
else
{
- origMessage.setJMSExpiration(timeToLive);
+ qpidityMessage.getDeliveryProperties().setExpiration(0);
}
- origMessage.setJMSTimestamp(currentTime);
}
- // set the message properties
- qpidMessage.setJMSDestination(destination);
- qpidMessage.setJMSMessageID(uid);
- qpidMessage.setJMSDeliveryMode(deliveryMode);
- qpidMessage.setJMSPriority(priority);
- if (timeToLive != 0)
+ qpidityMessage.getDeliveryProperties().setDeliveryMode((byte) deliveryMode);
+ qpidityMessage.getDeliveryProperties().setPriority((byte) priority);
+ qpidityMessage.getDeliveryProperties().setExchange(destination.getExchangeName().toString());
+ qpidityMessage.getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
+ BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ // set the application properties
+ qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString());
+ qpidityMessage.getMessageProperties().setCorrelationId(contentHeaderProperties.getCorrelationId().toString());
+ String replyToURL = contentHeaderProperties.getReplyToAsString();
+ if (replyToURL != null)
{
- qpidMessage.setJMSExpiration(timeToLive + currentTime);
- }
- else
- {
- qpidMessage.setJMSExpiration(timeToLive);
- }
- qpidMessage.setJMSTimestamp(currentTime);
- qpidMessage.setRoutingKey(destination.getDestinationName().toString());
- qpidMessage.setExchangeName(destination.getExchangeName().toString());
- // call beforeMessageDispatch
- try
- {
- qpidMessage.beforeMessageDispatch();
+ AMQBindingURL dest;
+ try
+ {
+ dest = new AMQBindingURL(replyToURL);
+ }
+ catch (URLSyntaxException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ qpidityMessage.getMessageProperties()
+ .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
}
- catch (QpidException e)
+ if (contentHeaderProperties.getHeaders() != null)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ // todo use the new fieldTable
+ qpidityMessage.getMessageProperties().setApplicationHeaders(null);
}
+ // send the message
try
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
- qpidMessage.getQpidityMessage(),
+ ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(),
+ qpidityMessage,
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
}
catch (IOException e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ }
+
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index af254fbbaf..94be090cf2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -72,11 +72,11 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
_data.setAutoExpand(true);
}
- AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index 3b8ce9a98a..5904131122 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -34,6 +34,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
* @author Apache Software Foundation
@@ -86,7 +87,7 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
}
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 87df7e1337..b115086d71 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -26,6 +26,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +44,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
- AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException;
+ AMQShortString routingKey,
+ BasicContentHeaderProperties contentHeader) throws AMQException;
- protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException
+ protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -62,8 +68,8 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
{
if (debug)
{
- _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize
- + ")");
+ _logger.debug("Fragmented message body (" + bodies
+ .size() + " frames, bodySize=" + contentHeader.bodySize + ")");
}
data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
@@ -84,17 +90,71 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
if (debug)
{
- _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining="
- + data.remaining());
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
+ .remaining());
}
- return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
+ return createMessage(messageNbr, data, exchange, routingKey,
+ (BasicContentHeaderProperties) contentHeader.properties);
}
+ protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies) throws AMQException
+ {
+ ByteBuffer data;
+ final boolean debug = _logger.isDebugEnabled();
+
+ // we optimise the non-fragmented case to avoid copying
+ if ((bodies != null))
+ {
+ data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+ }
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
+ }
+
+ if (debug)
+ {
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
+ .remaining());
+ }
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ // set the properties of this message
+ MessageProperties mprop = (MessageProperties) contentHeader[0];
+ DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
+ props.setContentType(mprop.getContentType());
+ props.setCorrelationId(mprop.getCorrelationId());
+ props.setEncoding(mprop.getContentEncoding());
+ props.setExpiration(devprop.getExpiration());
+ // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
+ props.setMessageId(mprop.getMessageId());
+ props.setPriority((byte) devprop.getPriority());
+ // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo()));
+ props.setTimestamp(devprop.getTimestamp());
+ props.setType(mprop.getType());
+ props.setUserId(mprop.getUserId());
+ return createMessage(messageNbr, data, exchange, routingKey, props);
+ }
+
+
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException
+ AMQShortString exchange, AMQShortString routingKey, List bodies)
+ throws JMSException, AMQException
+ {
+ final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ msg.setJMSRedelivered(redelivered);
+
+ return msg;
+ }
+
+ public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
+ AMQShortString exchange, AMQShortString routingKey, List bodies)
+ throws JMSException, AMQException
{
- final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ final AbstractJMSMessage msg =
+ create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
return msg;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 19382b58c3..4f5641bcff 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -34,6 +34,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
@@ -57,7 +58,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
super(data); // this instanties a content header
}
- JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
index fd2aae9feb..0202dc29df 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
@@ -26,12 +26,13 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
@@ -40,4 +41,7 @@ public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
return new JMSBytesMessage();
}
+
+ // 0_10 specific
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 495f09e8fd..1fb5e637c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
populateMapFromData();
}
- JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
index a6b9bb29a4..7cb8b637e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
@@ -26,6 +26,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
{
@@ -36,8 +37,9 @@ public class JMSMapMessageFactory extends AbstractJMSMessageFactory
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index caf8741280..385eee47c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -69,10 +69,10 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
ByteBuffer data) throws AMQException
{
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
public void clearBodyImpl() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
index 57ac4fb006..e7369dcb26 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
@@ -26,12 +26,13 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index b4350c7a98..62f3150ed1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -27,6 +27,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
* @author Apache Software Foundation
@@ -60,7 +61,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
}
- JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
index c34ee7175d..4bb648e090 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
@@ -26,16 +26,16 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
-
public AbstractJMSMessage createMessage() throws JMSException
{
return new JMSStreamMessage();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
index c5942dbe2a..c578c15a6a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
@@ -38,9 +38,9 @@ public class JMSTextMessageFactory extends AbstractJMSMessageFactory
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
- return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties,
+ return new JMSTextMessage(deliveryTag, contentHeader,
exchange, routingKey, data);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 0fe4af715d..5c1ee713fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpidity.Struct;
public interface MessageFactory
@@ -37,5 +38,11 @@ public interface MessageFactory
List bodies)
throws JMSException, AMQException;
+ AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ Struct[] contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies)
+ throws JMSException, AMQException;
+
AbstractJMSMessage createMessage() throws JMSException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 2d71af56c0..b60fc26fc0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -30,18 +30,29 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageFactoryRegistry
{
+ /**
+ * This class logger
+ */
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
- new HashMap<AMQShortString, MessageFactory>();
+ new HashMap<AMQShortString, MessageFactory>();
/**
* Construct a new registry with the default message factories registered
+ *
* @return a message factory registry
*/
- public static MessageFactoryRegistry newDefault08Registry()
+ public static MessageFactoryRegistry newDefaultRegistry()
{
MessageFactoryRegistry mf = new MessageFactoryRegistry();
mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
@@ -55,24 +66,6 @@ public class MessageFactoryRegistry
return mf;
}
- /**
- * Construct a new 010 registry with the default message factories registered
- * @return a message factory registry
- */
- public static MessageFactoryRegistry newDefault010Registry()
- {
- // TODO use 0.10 classes
- MessageFactoryRegistry mf = new MessageFactoryRegistry();
- mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
- mf.registerFactory("text/plain", new JMSTextMessageFactory());
- mf.registerFactory("text/xml", new JMSTextMessageFactory());
- mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
- mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
- mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
- mf.registerFactory(null, new JMSBytesMessageFactory());
-
- return mf;
- }
public void registerFactory(String mimeType, MessageFactory mf)
@@ -96,25 +89,26 @@ public class MessageFactoryRegistry
/**
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
- * @param deliveryTag the AMQ message id
- * @param redelivered true if redelivered
+ *
+ * @param deliveryTag the AMQ message id
+ * @param redelivered true if redelivered
* @param contentHeader the content header that was received
- * @param bodies a list of ContentBody instances
+ * @param bodies a list of ContentBody instances
* @return the message.
* @throws AMQException
* @throws JMSException
*/
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
- throws AMQException, JMSException
+ throws AMQException, JMSException
{
BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
AMQShortString contentTypeShortString = properties.getContentType();
- contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
- : contentTypeShortString;
+ contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(
+ JMSBytesMessage.MIME_TYPE) : contentTypeShortString;
MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
@@ -127,6 +121,29 @@ public class MessageFactoryRegistry
}
}
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+ AMQShortString routingKey, Struct[] contentHeader, List bodies)
+ throws AMQException, JMSException
+ {
+ MessageProperties mprop = (MessageProperties) contentHeader[0];
+ String messageType = mprop.getContentType();
+ if (messageType == null)
+ {
+ _logger.debug("no message type specified, building a byte message");
+ messageType = JMSBytesMessage.MIME_TYPE;
+ }
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+ if (mf == null)
+ {
+ throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
+ }
+ else
+ {
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+ }
+ }
+
+
public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
{
if (mimeType == null)