diff options
20 files changed, 317 insertions, 197 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e4318f4c0d..aa6756d116 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -112,7 +112,7 @@ public class AMQSession_0_10 extends AMQSession int defaultPrefetchLow) { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault010Registry(), + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 025054e710..180a1e663c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -100,7 +100,7 @@ public class AMQSession_0_8 extends AMQSession AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault08Registry(), defaultPrefetchHigh, + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index a11744a6d5..412c7e9a8a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -39,18 +39,18 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer +public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); - /** The connection being used by this consumer */ + /** + * The connection being used by this consumer + */ private AMQConnection _connection; private String _messageSelector; @@ -59,15 +59,23 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes private AMQDestination _destination; - /** When true indicates that a blocking receive call is in progress */ + /** + * When true indicates that a blocking receive call is in progress + */ private final AtomicBoolean _receiving = new AtomicBoolean(false); - /** Holds an atomic reference to the listener installed. */ + /** + * Holds an atomic reference to the listener installed. + */ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); - /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ + /** + * The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker + */ protected AMQShortString _consumerTag; - /** We need to know the channel id when constructing frames */ + /** + * We need to know the channel id when constructing frames + */ protected int _channelId; /** @@ -82,7 +90,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes protected AMQProtocolHandler _protocolHandler; - /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ + /** + * We need to store the "raw" field table so that we can resubscribe in the event of failover being required + */ private FieldTable _rawSelectorFieldTable; /** @@ -97,7 +107,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes */ private int _prefetchLow; - /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ + /** + * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover + */ private boolean _exclusive; /** @@ -107,7 +119,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes */ private int _acknowledgeMode; - /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ + /** + * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode + */ private int _outstanding; /** @@ -118,7 +132,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); - /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + /** + * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. + */ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); /** @@ -138,9 +154,10 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, + AMQSession session, AMQProtocolHandler protocolHandler, + FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -216,8 +233,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes if (_logger.isDebugEnabled()) { - _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " - + _destination); + _logger.debug( + "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination); } } else @@ -418,9 +435,7 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes * it by throwing it (if an exception) or returning it (in any other case). * * @param o - * * @return a message only if o is a Message - * * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a * JMSException is created with the linked exception set appropriately */ @@ -465,8 +480,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes { if (_closedStack != null) { - _logger.trace(_consumerTag + " close():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()) + .subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -518,8 +533,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes { if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " markClosed():" + Arrays + .asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -588,14 +603,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes } } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception - { + public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame) + throws Exception; - return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); - - } /** * @param jmsMessage this message has already been processed so can't redo preDeliver @@ -643,77 +653,79 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } - void postDeliver(AbstractJMSMessage msg) throws JMSException + void postDeliver(AbstractJMSMessage msg) throws JMSException { msg.setJMSDestination(_destination); switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) + if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } - } - break; + break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } - break; + break; } } - /** Acknowledge up to last message delivered (if any). Used when commiting. */ + /** + * Acknowledge up to last message delivered (if any). Used when commiting. + */ void acknowledgeLastDelivered() { if (!_receivedDeliveryTags.isEmpty()) @@ -740,8 +752,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes { if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " notifyError():" + Arrays + .asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -819,7 +831,9 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes } } - /** Called on recovery to reset the list of delivery tags */ + /** + * Called on recovery to reset the list of delivery tags + */ public void clearUnackedMessages() { _unacknowledgedDeliveryTags.clear(); @@ -860,8 +874,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" - + "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags + .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -890,8 +904,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" - + "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue + .size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -914,8 +928,8 @@ public abstract class BasicMessageConsumer<H,B> extends Closeable implements Mes } else { - _logger.error("Queue contained a :" + o.getClass() - + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + _logger.error("Queue contained a :" + o + .getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index f2019ea43e..ec27fdbb71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -32,11 +32,12 @@ import org.apache.qpidity.Struct; import javax.jms.JMSException; import java.io.IOException; +import java.nio.ByteBuffer; /** * This is a 0.10 message consumer. */ -public class BasicMessageConsumer_0_10 extends BasicMessageConsumer +public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> implements org.apache.qpidity.client.util.MessageListener { /** @@ -108,5 +109,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag()); super.postDeliver(msg); } - + + + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception + { + return _messageFactory.createMessage(messageFrame.getDeliveryTag(), + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + } }
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 80985daea4..c82ed96e4c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -75,4 +75,13 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } + + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception + { + + return _messageFactory.createMessage(messageFrame.getDeliveryTag(), + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + + } }
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 435efdf2bd..8ecb5ffd78 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -21,19 +21,19 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpidity.jms.message.MessageImpl; -import org.apache.qpidity.jms.message.MessageHelper; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; import org.apache.qpidity.jms.ExceptionHelper; -import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.util.ByteBufferMessage; +import org.apache.qpidity.ReplyTo; import javax.jms.Message; import javax.jms.JMSException; -import java.util.UUID; import java.io.IOException; /** - * - * This is a 0_10 message producer. + * This is a 0_10 message producer. */ public class BasicMessageProducer_0_10 extends BasicMessageProducer { @@ -66,85 +66,79 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer //--- Overwritten methods - /** * Sends a message to a given destination - * We will always convert the received message */ public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { - // Only get current time if required - long currentTime = Long.MIN_VALUE; - if (!((timeToLive == 0) && _disableTimestamps)) + message.prepareForSending(); + org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage(); + // set the payload + try { - currentTime = System.currentTimeMillis(); + qpidityMessage.appendData(message.getData().buf()); } - // the messae UID - String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString(); - MessageImpl qpidMessage; - // check that the message is not a foreign one - try + catch (IOException e) { - qpidMessage = (MessageImpl) origMessage; + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - catch (ClassCastException cce) + // set the delivery properties + if (!_disableTimestamps) { - // this is a foreign message - qpidMessage = MessageHelper.transformMessage(origMessage); - // set message's properties in case they are queried after send. - origMessage.setJMSDestination(destination); - origMessage.setJMSDeliveryMode(deliveryMode); - origMessage.setJMSPriority(priority); - origMessage.setJMSMessageID(uid); - if (timeToLive != 0) + final long currentTime = System.currentTimeMillis(); + qpidityMessage.getDeliveryProperties().setTimestamp(currentTime); + if (timeToLive > 0) { - origMessage.setJMSExpiration(timeToLive + currentTime); - _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + qpidityMessage.getDeliveryProperties().setExpiration(currentTime + timeToLive); } else { - origMessage.setJMSExpiration(timeToLive); + qpidityMessage.getDeliveryProperties().setExpiration(0); } - origMessage.setJMSTimestamp(currentTime); } - // set the message properties - qpidMessage.setJMSDestination(destination); - qpidMessage.setJMSMessageID(uid); - qpidMessage.setJMSDeliveryMode(deliveryMode); - qpidMessage.setJMSPriority(priority); - if (timeToLive != 0) + qpidityMessage.getDeliveryProperties().setDeliveryMode((byte) deliveryMode); + qpidityMessage.getDeliveryProperties().setPriority((byte) priority); + qpidityMessage.getDeliveryProperties().setExchange(destination.getExchangeName().toString()); + qpidityMessage.getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString()); + BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); + // set the application properties + qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString()); + qpidityMessage.getMessageProperties().setCorrelationId(contentHeaderProperties.getCorrelationId().toString()); + String replyToURL = contentHeaderProperties.getReplyToAsString(); + if (replyToURL != null) { - qpidMessage.setJMSExpiration(timeToLive + currentTime); - } - else - { - qpidMessage.setJMSExpiration(timeToLive); - } - qpidMessage.setJMSTimestamp(currentTime); - qpidMessage.setRoutingKey(destination.getDestinationName().toString()); - qpidMessage.setExchangeName(destination.getExchangeName().toString()); - // call beforeMessageDispatch - try - { - qpidMessage.beforeMessageDispatch(); + AMQBindingURL dest; + try + { + dest = new AMQBindingURL(replyToURL); + } + catch (URLSyntaxException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + qpidityMessage.getMessageProperties() + .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString())); } - catch (QpidException e) + if (contentHeaderProperties.getHeaders() != null) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + // todo use the new fieldTable + qpidityMessage.getMessageProperties().setApplicationHeaders(null); } + // send the message try { - ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(), - qpidMessage.getQpidityMessage(), + ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(), + qpidityMessage, org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); } catch (IOException e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index af254fbbaf..94be090cf2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -72,11 +72,11 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage _data.setAutoExpand(true); } - AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); + super(messageNbr, contentHeader, exchange, routingKey, data); getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 3b8ce9a98a..5904131122 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -34,6 +34,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
* @author Apache Software Foundation
@@ -86,7 +87,7 @@ public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage }
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 87df7e1337..b115086d71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -26,6 +26,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpidity.Struct; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.DeliveryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,10 +44,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange, - AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException; + AMQShortString routingKey, + BasicContentHeaderProperties contentHeader) throws AMQException; - protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException + protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, + List bodies) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); @@ -62,8 +68,8 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { if (debug) { - _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize - + ")"); + _logger.debug("Fragmented message body (" + bodies + .size() + " frames, bodySize=" + contentHeader.bodySize + ")"); } data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? @@ -84,17 +90,71 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory if (debug) { - _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" - + data.remaining()); + _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data + .remaining()); } - return createMessage(messageNbr, data, exchange, routingKey, contentHeader); + return createMessage(messageNbr, data, exchange, routingKey, + (BasicContentHeaderProperties) contentHeader.properties); } + protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader, + AMQShortString exchange, AMQShortString routingKey, + List bodies) throws AMQException + { + ByteBuffer data; + final boolean debug = _logger.isDebugEnabled(); + + // we optimise the non-fragmented case to avoid copying + if ((bodies != null)) + { + data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0)); + } + else // bodies == null + { + data = ByteBuffer.allocate(0); + } + + if (debug) + { + _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data + .remaining()); + } + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + // set the properties of this message + MessageProperties mprop = (MessageProperties) contentHeader[0]; + DeliveryProperties devprop = (DeliveryProperties) contentHeader[1]; + props.setContentType(mprop.getContentType()); + props.setCorrelationId(mprop.getCorrelationId()); + props.setEncoding(mprop.getContentEncoding()); + props.setExpiration(devprop.getExpiration()); + // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders()); + props.setMessageId(mprop.getMessageId()); + props.setPriority((byte) devprop.getPriority()); + // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo())); + props.setTimestamp(devprop.getTimestamp()); + props.setType(mprop.getType()); + props.setUserId(mprop.getUserId()); + return createMessage(messageNbr, data, exchange, routingKey, props); + } + + public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException + AMQShortString exchange, AMQShortString routingKey, List bodies) + throws JMSException, AMQException + { + final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); + msg.setJMSRedelivered(redelivered); + + return msg; + } + + public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) + throws JMSException, AMQException { - final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); + final AbstractJMSMessage msg = + create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); msg.setJMSRedelivered(redelivered); return msg; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 19382b58c3..4f5641bcff 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -34,6 +34,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { @@ -57,7 +58,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(data); // this instanties a content header } - JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, exchange, routingKey, data); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java index fd2aae9feb..0202dc29df 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java @@ -26,12 +26,13 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException + BasicContentHeaderProperties contentHeader) throws AMQException { return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data); } @@ -40,4 +41,7 @@ public class JMSBytesMessageFactory extends AbstractJMSMessageFactory { return new JMSBytesMessage(); } + + // 0_10 specific + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 495f09e8fd..1fb5e637c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm populateMapFromData(); } - JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, exchange, routingKey, data); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java index a6b9bb29a4..7cb8b637e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java @@ -26,6 +26,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSMapMessageFactory extends AbstractJMSMessageFactory { @@ -36,8 +37,9 @@ public class JMSMapMessageFactory extends AbstractJMSMessageFactory protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException + BasicContentHeaderProperties contentHeader) throws AMQException { return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index caf8741280..385eee47c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -69,10 +69,10 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, ByteBuffer data) throws AMQException { - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); + super(messageNbr, contentHeader, exchange, routingKey, data); } public void clearBodyImpl() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java index 57ac4fb006..e7369dcb26 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java @@ -26,12 +26,13 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSObjectMessageFactory extends AbstractJMSMessageFactory { protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException + BasicContentHeaderProperties contentHeader) throws AMQException { return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index b4350c7a98..62f3150ed1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -27,6 +27,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; /** * @author Apache Software Foundation @@ -60,7 +61,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } - JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, ByteBuffer data) throws AMQException { super(messageNbr, contentHeader, exchange, routingKey, data); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java index c34ee7175d..4bb648e090 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -26,16 +26,16 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; public class JMSStreamMessageFactory extends AbstractJMSMessageFactory { protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException + BasicContentHeaderProperties contentHeader) throws AMQException { return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data); } - public AbstractJMSMessage createMessage() throws JMSException { return new JMSStreamMessage(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java index c5942dbe2a..c578c15a6a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java @@ -38,9 +38,9 @@ public class JMSTextMessageFactory extends AbstractJMSMessageFactory protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, AMQShortString exchange, AMQShortString routingKey, - ContentHeaderBody contentHeader) throws AMQException + BasicContentHeaderProperties contentHeader) throws AMQException { - return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, + return new JMSTextMessage(deliveryTag, contentHeader, exchange, routingKey, data); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 0fe4af715d..5c1ee713fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpidity.Struct; public interface MessageFactory @@ -37,5 +38,11 @@ public interface MessageFactory List bodies) throws JMSException, AMQException; + AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + Struct[] contentHeader, + AMQShortString exchange, AMQShortString routingKey, + List bodies) + throws JMSException, AMQException; + AbstractJMSMessage createMessage() throws JMSException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 2d71af56c0..b60fc26fc0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -30,18 +30,29 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpidity.Struct; +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MessageFactoryRegistry { + /** + * This class logger + */ + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = - new HashMap<AMQShortString, MessageFactory>(); + new HashMap<AMQShortString, MessageFactory>(); /** * Construct a new registry with the default message factories registered + * * @return a message factory registry */ - public static MessageFactoryRegistry newDefault08Registry() + public static MessageFactoryRegistry newDefaultRegistry() { MessageFactoryRegistry mf = new MessageFactoryRegistry(); mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); @@ -55,24 +66,6 @@ public class MessageFactoryRegistry return mf; } - /** - * Construct a new 010 registry with the default message factories registered - * @return a message factory registry - */ - public static MessageFactoryRegistry newDefault010Registry() - { - // TODO use 0.10 classes - MessageFactoryRegistry mf = new MessageFactoryRegistry(); - mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); - mf.registerFactory("text/plain", new JMSTextMessageFactory()); - mf.registerFactory("text/xml", new JMSTextMessageFactory()); - mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); - mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); - mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); - mf.registerFactory(null, new JMSBytesMessageFactory()); - - return mf; - } public void registerFactory(String mimeType, MessageFactory mf) @@ -96,25 +89,26 @@ public class MessageFactoryRegistry /** * Create a message. This looks up the MIME type from the content header and instantiates the appropriate * concrete message type. - * @param deliveryTag the AMQ message id - * @param redelivered true if redelivered + * + * @param deliveryTag the AMQ message id + * @param redelivered true if redelivered * @param contentHeader the content header that was received - * @param bodies a list of ContentBody instances + * @param bodies a list of ContentBody instances * @return the message. * @throws AMQException * @throws JMSException */ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) - throws AMQException, JMSException + throws AMQException, JMSException { BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. AMQShortString contentTypeShortString = properties.getContentType(); - contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) - : contentTypeShortString; + contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString( + JMSBytesMessage.MIME_TYPE) : contentTypeShortString; MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) @@ -127,6 +121,29 @@ public class MessageFactoryRegistry } } + public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, + AMQShortString routingKey, Struct[] contentHeader, List bodies) + throws AMQException, JMSException + { + MessageProperties mprop = (MessageProperties) contentHeader[0]; + String messageType = mprop.getContentType(); + if (messageType == null) + { + _logger.debug("no message type specified, building a byte message"); + messageType = JMSBytesMessage.MIME_TYPE; + } + MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType)); + if (mf == null) + { + throw new AMQException(null, "Unsupport MIME type of " + messageType, null); + } + else + { + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); + } + } + + public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException { if (mimeType == null) |