diff options
author | Robert Greig <rgreig@apache.org> | 2007-02-12 13:25:36 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-02-12 13:25:36 +0000 |
commit | b53c13e9d33aa35ed38c647bfa29fab0bbe58915 (patch) | |
tree | a3f9401cd095238ca4f4f5d6b04582f6a33adc56 /java | |
parent | cd8ccb1a691ef5eb260b165f08fd9a07d1e5867d (diff) | |
download | qpid-python-b53c13e9d33aa35ed38c647bfa29fab0bbe58915.tar.gz |
(Patch submitted by Rupert Smith) Qpid-360 fixes.
Message type defaults to ByteMessage when not specified.
Unknown destination type is used as default when not specified.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@506439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 437 insertions, 260 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index fc3450c385..a0110cc8af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,17 +20,20 @@ */ package org.apache.qpid.client; +import java.io.UnsupportedEncodingException; + +import javax.jms.*; + import org.apache.log4j.Logger; + import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import javax.jms.*; -import java.io.UnsupportedEncodingException; - public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { protected final Logger _logger = Logger.getLogger(getClass()); @@ -101,9 +104,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _waitUntilSent; private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; - protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, - int channelId, AMQSession session, AMQProtocolHandler protocolHandler, - long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent) + protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + boolean immediate, boolean mandatory, boolean waitUntilSent) { _connection = connection; _destination = destination; @@ -116,6 +119,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { declareDestination(destination); } + _immediate = immediate; _mandatory = mandatory; _waitUntilSent = waitUntilSent; @@ -134,18 +138,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type + AMQFrame declare = + ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + destination.getExchangeName(), // exchange + false, // internal + true, // nowait + false, // passive + _session.getTicket(), // ticket + destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -159,6 +162,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public boolean getDisableMessageID() throws JMSException { checkNotClosed(); + // Always false for AMQP return false; } @@ -172,39 +176,44 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public boolean getDisableMessageTimestamp() throws JMSException { checkNotClosed(); + return _disableTimestamps; } public void setDeliveryMode(int i) throws JMSException { checkPreConditions(); - if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT) + if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT)) { - throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + - " is illegal"); + throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i + + " is illegal"); } + _deliveryMode = i; } public int getDeliveryMode() throws JMSException { checkNotClosed(); + return _deliveryMode; } public void setPriority(int i) throws JMSException { checkPreConditions(); - if (i < 0 || i > 9) + if ((i < 0) || (i > 9)) { throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9"); } + _messagePriority = i; } public int getPriority() throws JMSException { checkNotClosed(); + return _messagePriority; } @@ -215,18 +224,21 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l); } + _timeToLive = l; } public long getTimeToLive() throws JMSException { checkNotClosed(); + return _timeToLive; } public Destination getDestination() throws JMSException { checkNotClosed(); + return _destination; } @@ -241,11 +253,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkPreConditions(); checkInitialDestination(); - synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); + sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -256,8 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -267,20 +276,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, - _mandatory, immediate); + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate); } } - public void send(Message message, int deliveryMode, int priority, - long timeToLive) throws JMSException + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { checkPreConditions(); checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, - _immediate); + sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } @@ -291,69 +297,60 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, + _immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive, boolean mandatory) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory) throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive, boolean mandatory, boolean immediate) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate) throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - mandatory, immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate); } } - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive, boolean mandatory, - boolean immediate, boolean waitUntilSent) - throws JMSException + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { checkPreConditions(); checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, - mandatory, immediate, waitUntilSent); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, + waitUntilSent); } } - private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException { if (message instanceof AbstractJMSMessage) @@ -366,23 +363,23 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (message instanceof BytesMessage) { - newMessage = new MessageConverter((BytesMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage(); } else if (message instanceof MapMessage) { - newMessage = new MessageConverter((MapMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((MapMessage) message).getConvertedMessage(); } else if (message instanceof ObjectMessage) { - newMessage = new MessageConverter((ObjectMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage(); } else if (message instanceof TextMessage) { - newMessage = new MessageConverter((TextMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((TextMessage) message).getConvertedMessage(); } else if (message instanceof StreamMessage) { - newMessage = new MessageConverter((StreamMessage)message).getConvertedMessage(); + newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage(); } else { @@ -395,24 +392,25 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } else { - throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName()); + throw new JMSException("Unable to send message, due to class conversion error: " + + message.getClass().getName()); } } } - private void validateDestination(Destination destination) throws JMSException { if (!(destination instanceof AMQDestination)) { - throw new JMSException("Unsupported destination class: " + - (destination != null ? destination.getClass() : null)); + throw new JMSException("Unsupported destination class: " + + ((destination != null) ? destination.getClass() : null)); } + declareDestination((AMQDestination) destination); } - protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } @@ -429,21 +427,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param immediate * @throws JMSException */ - protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean wait) throws JMSException { checkTemporaryDestination(destination); origMessage.setJMSDestination(destination); - AbstractJMSMessage message = convertToNativeMessage(origMessage); int type; - if(destination instanceof Topic) + if (destination instanceof Topic) { type = AMQDestination.TOPIC_TYPE; } - else if(destination instanceof Queue) + else if (destination instanceof Queue) { type = AMQDestination.QUEUE_TYPE; } @@ -452,22 +449,19 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j type = AMQDestination.UNKNOWN_TYPE; } - message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), - type); + message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - _session.getTicket()); // ticket - - + AMQFrame publishFrame = + BasicPublishBody.createAMQFrame( + _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(), + destination.getExchangeName(), // exchange + immediate, // immediate + mandatory, // mandatory + destination.getRoutingKey(), // routingKey + _session.getTicket()); // ticket message.prepareForSending(); ByteBuffer payload = message.getData(); @@ -487,6 +481,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setExpiration(0); } } + contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); @@ -494,12 +489,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; - if(payload != null) + if (payload != null) { createContentBodies(payload, frames, 2, _channelId); } - if (contentBodyFrameCount != 0 && _logger.isDebugEnabled()) + if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) { _logger.debug("Sending content body frames to " + destination); } @@ -508,12 +503,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - 0, - contentHeaderProperties, - size); + ContentHeaderBody.createAMQFrame(_channelId, + BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), 0, + contentHeaderProperties, size); if (_logger.isDebugEnabled()) { _logger.debug("Sending content header frame to " + destination); @@ -524,7 +517,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); _protocolHandler.writeFrame(compositeFrame, wait); - if (message != origMessage) { _logger.debug("Updating original message"); @@ -538,16 +530,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void checkTemporaryDestination(AMQDestination destination) throws JMSException { - if(destination instanceof TemporaryDestination) + if (destination instanceof TemporaryDestination) { _logger.debug("destination is temporary destination"); TemporaryDestination tempDest = (TemporaryDestination) destination; - if(tempDest.getSession().isClosed()) + if (tempDest.getSession().isClosed()) { _logger.debug("session is closed"); throw new JMSException("Session for temporary destination has been closed"); } - if(tempDest.isDeleted()) + + if (tempDest.isDeleted()) { _logger.debug("destination is deleted"); throw new JMSException("Cannot send to a deleted temporary destination"); @@ -567,9 +560,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) { - if (frames.length == offset + 1) + if (frames.length == (offset + 1)) { - frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload)); + frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); } else { @@ -578,10 +571,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j long remaining = payload.remaining(); for (int i = offset; i < frames.length; i++) { - payload.position((int) framePayloadMax * (i-offset)); + payload.position((int) framePayloadMax * (i - offset)); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice())); + frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); remaining -= length; } @@ -594,7 +587,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // we substract one from the total frame maximum size to account for the end of frame marker in a body frame // (0xCE byte). int frameCount; - if(payload == null || payload.remaining() == 0) + if ((payload == null) || (payload.remaining() == 0)) { frameCount = 0; } @@ -602,9 +595,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int) (dataLength / framePayloadMax) + lastFrame; } + return frameCount; } @@ -624,7 +618,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { checkNotClosed(); - if (_session == null || _session.isClosed()) + if ((_session == null) || _session.isClosed()) { throw new javax.jms.IllegalStateException("Invalid Session"); } @@ -640,9 +634,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException { - if (_destination != null && suppliedDestination != null) + if ((_destination != null) && (suppliedDestination != null)) { - throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + throw new UnsupportedOperationException( + "This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); } if (suppliedDestination == null) @@ -650,10 +645,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throw new InvalidDestinationException("Supplied Destination was invalid"); } - } - public AMQSession getSession() { return _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 8b6f2b4ab1..e3388be9ed 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,27 +20,29 @@ */ package org.apache.qpid.client.message; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; + +import javax.jms.*; + import org.apache.commons.collections.map.ReferenceMap; + import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; -import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQUndefinedDestination; import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQShortString; - -import javax.jms.*; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLSyntaxException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); - protected boolean _redelivered; @@ -60,10 +62,11 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { _data.acquire(); } + _readableProperties = false; _readableMessage = (data != null); _changedData = (data == null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders()); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, @@ -71,28 +74,29 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { this(contentHeader, deliveryTag); - - int type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue(); AMQDestination dest; - switch(type) + switch (contentType) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - break; + + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; } //Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); - - _data = data; if (_data != null) { @@ -107,7 +111,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { super(contentHeader, deliveryTag); _readableProperties = (_contentHeaderProperties != null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders()); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); } public String getJMSMessageID() throws JMSException @@ -116,6 +120,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { getContentHeaderProperties().setMessageId("ID:" + _deliveryTag); } + return getContentHeaderProperties().getMessageId(); } @@ -178,6 +183,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _destinationCache.put(replyToEncoding, dest); } + return dest; } } @@ -188,11 +194,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { throw new IllegalArgumentException("Null destination not allowed"); } + if (!(destination instanceof AMQDestination)) { - throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " + - destination.getClass()); + throw new IllegalArgumentException( + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } + final AMQDestination amqd = (AMQDestination) destination; final AMQShortString encodedDestination = amqd.getEncodedName(); @@ -278,17 +286,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _readableMessage = false; } - public boolean propertyExists(AMQShortString propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().propertyExists(propertyName); } - public boolean propertyExists(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().propertyExists(propertyName); } @@ -299,7 +307,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach return getJmsHeaders().getBoolean(propertyName); } - public boolean getBooleanProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); @@ -310,48 +317,56 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte getByteProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getByte(propertyName); } public short getShortProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getObject(propertyName); } @@ -436,7 +451,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach getJmsHeaders().remove(propertyName); } - protected void removeProperty(String propertyName) throws JMSException { checkPropertyName(propertyName); @@ -468,7 +482,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - /** * This forces concrete classes to implement clearBody() * @@ -511,6 +524,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { buf.append('\n').append(getJmsHeaders().getHeaders()); } + return buf.toString(); } catch (JMSException e) @@ -519,7 +533,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) { getContentHeaderProperties().setHeaders(messageProperties); @@ -550,6 +563,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { reset(); } + return _data; } @@ -608,6 +622,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException { checkPropertyName(propertyName); + return getJmsHeaders().getBytes(propertyName); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 83dcc57b80..e02771d8f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,20 +20,40 @@ */ package org.apache.qpid.client.message; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; - -import javax.jms.JMSException; -import java.util.HashMap; -import java.util.Map; -import java.util.List; public class MessageFactoryRegistry { private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); - private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>(); + private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = + new HashMap<AMQShortString, MessageFactory>(); + + /** + * Construct a new registry with the default message factories registered + * @return a message factory registry + */ + public static MessageFactoryRegistry newDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); + mf.registerFactory("text/plain", new JMSTextMessageFactory()); + mf.registerFactory("text/xml", new JMSTextMessageFactory()); + mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); + mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); + mf.registerFactory(null, new JMSBytesMessageFactory()); + + return mf; + } public void registerFactory(String mimeType, MessageFactory mf) { @@ -41,6 +61,7 @@ public class MessageFactoryRegistry { throw new IllegalArgumentException("Message factory must not be null"); } + _mimeStringToFactoryMap.put(mimeType, mf); _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf); } @@ -48,6 +69,7 @@ public class MessageFactoryRegistry public MessageFactory deregisterFactory(String mimeType) { _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType)); + return _mimeStringToFactoryMap.remove(mimeType); } @@ -62,14 +84,19 @@ public class MessageFactoryRegistry * @throws AMQException * @throws JMSException */ - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, - ContentHeaderBody contentHeader, - List bodies) throws AMQException, JMSException + public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) + throws AMQException, JMSException { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; - MessageFactory mf = _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString()); + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; + + // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over + // AMQP. When the type is null, it can only be assumed that the message is a byte message. + AMQShortString contentTypeShortString = properties.getContentTypeShortString(); + contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) + : contentTypeShortString; + + MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) { throw new AMQException("Unsupport MIME type of " + properties.getContentType()); @@ -86,6 +113,7 @@ public class MessageFactoryRegistry { throw new IllegalArgumentException("Mime type must not be null"); } + MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); if (mf == null) { @@ -96,21 +124,4 @@ public class MessageFactoryRegistry return mf.createMessage(); } } - - /** - * Construct a new registry with the default message factories registered - * @return a message factory registry - */ - public static MessageFactoryRegistry newDefaultRegistry() - { - MessageFactoryRegistry mf = new MessageFactoryRegistry(); - mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); - mf.registerFactory("text/plain", new JMSTextMessageFactory()); - mf.registerFactory("text/xml", new JMSTextMessageFactory()); - mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); - mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); - mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); - mf.registerFactory(null, new JMSBytesMessageFactory()); - return mf; - } } diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java index 47c608cfe4..76a0690b8c 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,122 +20,277 @@ */ package org.apache.qpid.topic; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; +import java.util.Random; + +import javax.jms.*; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; + +/** + * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for + * cross testing the java and cpp clients. + * + * <p/>How the cpp topic_publisher operates: + * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for + * the specified number of test messages to be sent. + * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST", + * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The + * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message + * about the number of messages received and how long it took, although the publisher never looks at the message content. + * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", + * which the listener should close its connection and terminate upon receipt of. + * + * @deprecated Use PingPongBouncer instead once the below todo is completed. + * + * @todo Make the functionality of this class available through PingPongBouncer. Rename PingPongBouncer to + * PingListener and make its bouncing functionality optional, either through a switch or as an extending class + * called PingBouncer. Want to have as few ping classes as possible with configurable behaviour, re-using code + * accross p2p and topic style tests in almost all cases. + */ public class Listener implements MessageListener { + private static Logger log = Logger.getLogger(Listener.class); + + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + public static final String CONTROL_TOPIC = "topic_control"; + public static final String RESPONSE_QUEUE = "response"; + + private final Topic _topic; + //private final Topic _control; + + private final Queue _response; + + private final byte[] _payload; + + /** Holds the connection to listen on. */ private final Connection _connection; + + /** Holds the producer to send control messages on. */ private final MessageProducer _controller; + + /** Holds the JMS session. */ private final javax.jms.Session _session; - private final MessageFactory _factory; + + /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ private boolean init; + + /** Holds the count of messages received by this listener. */ private int count; - private long start; - Listener(Connection connection, int ackMode) throws Exception - { - this(connection, ackMode, null); - } + /** Used to hold the start time of the first message. */ + private long start; + private static String clientId; Listener(Connection connection, int ackMode, String name) throws Exception { + log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name + + "): called"); + _connection = connection; _session = connection.createSession(false, ackMode); - _factory = new MessageFactory(_session); - //register for events - if(name == null) + if (_session instanceof AMQSession) { - _factory.createTopicConsumer().setMessageListener(this); + _topic = new AMQTopic(CONTROL_TOPIC); + //_control = new AMQTopic(CONTROL_TOPIC); + _response = new AMQQueue(RESPONSE_QUEUE); } else { - _factory.createDurableTopicConsumer(name).setMessageListener(this); + _topic = _session.createTopic(CONTROL_TOPIC); + //_control = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); } - _connection.start(); + int size = 256; - _controller = _factory.createControlPublisher(); - System.out.println("Waiting for messages " + - Config.getAckModeDescription(ackMode) - + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")") - + "..."); + _payload = new byte[size]; - } + for (int i = 0; i < size; i++) + { + _payload[i] = (byte) DATA[i % DATA.length]; + } - private void shutdown() - { - try + //register for events + if (name == null) { - _session.close(); - _connection.stop(); - _connection.close(); + log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)"); + createTopicConsumer().setMessageListener(this); } - catch(Exception e) + else { - e.printStackTrace(System.out); + log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)"); + createDurableTopicConsumer(name).setMessageListener(this); } + + _connection.start(); + + _controller = createControlPublisher(); + System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode) + + + ((name == null) + ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")")) + + "..."); } - private void report() + public static void main(String[] argv) throws Exception { - try - { - String msg = getReport(); - _controller.send(_factory.createReportResponseMessage(msg)); - System.out.println("Sent report: " + msg); - } - catch(Exception e) + clientId = "Listener-" + System.currentTimeMillis(); + + NDC.push(clientId); + + Config config = new Config(); + config.setOptions(argv); + + //Connection con = config.createConnection(); + Connection con = + new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort() + + "'"); + + if (config.getClientId() != null) { - e.printStackTrace(System.out); + con.setClientID(config.getClientId()); } + + new Listener(con, config.getAckMode(), config.getSubscriptionId()); + + NDC.pop(); + NDC.remove(); } - private String getReport() + /** + * Checks whether or not a text field on a message has the specified value. + * + * @param m The message to check. + * @param fieldName The name of the field to check. + * @param value The expected value of the field to compare with. + * + * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException { - long time = (System.currentTimeMillis() - start); - return "Received " + count + " in " + time + "ms"; + log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + + ", String value = " + value + "): called"); + + String comp = m.getStringProperty(fieldName); + + return (comp != null) && comp.equals(value); } public void onMessage(Message message) { - if(!init) + NDC.push(clientId); + + log.debug("public void onMessage(Message message): called"); + + if (!init) { - start = System.currentTimeMillis(); + start = System.nanoTime() / 1000000; count = 0; init = true; } - if(_factory.isShutdown(message)) + try { - shutdown(); + if (isShutdown(message)) + { + shutdown(); + } + else if (isReport(message)) + { + //send a report: + report(); + init = false; + } } - else if(_factory.isReport(message)) + catch (JMSException e) { - //send a report: - report(); - init = false; + log.warn("There was a JMSException during onMessage.", e); } - else if (++count % 100 == 0) + finally { - System.out.println("Received " + count + " messages."); + NDC.pop(); } } - public static void main(String[] argv) throws Exception + Message createReportResponseMessage(String msg) throws JMSException { - Config config = new Config(); - config.setOptions(argv); + return _session.createTextMessage(msg); + } + + boolean isShutdown(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); + + log.debug("isShutdown = " + result); + + return result; + } + + boolean isReport(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); + + log.debug("isReport = " + result); + + return result; + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_response); + } - Connection con = config.createConnection(); - if(config.getClientId() != null) + private void shutdown() + { + try { - con.setClientID(config.getClientId()); + _session.close(); + _connection.stop(); + _connection.close(); } - new Listener(con, config.getAckMode(), config.getSubscriptionId()); + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private void report() + { + try + { + String msg = getReport(); + _controller.send(createReportResponseMessage(msg)); + System.out.println("Sent report: " + msg); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private String getReport() + { + long time = ((System.nanoTime() / 1000000) - start); + + return "Received " + count + " in " + time + "ms"; } } diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java index 1520f18408..8b87f76c3e 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,11 +20,11 @@ */ package org.apache.qpid.topic; +import javax.jms.*; + import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import javax.jms.*; - /** */ class MessageFactory @@ -36,7 +36,6 @@ class MessageFactory private final Topic _control; private final byte[] _payload; - MessageFactory(Session session) throws JMSException { this(session, 256); @@ -45,24 +44,39 @@ class MessageFactory MessageFactory(Session session, int size) throws JMSException { _session = session; - if(session instanceof AMQSession) + if (session instanceof AMQSession) { - _topic = new AMQTopic("topictest.messages"); + _topic = new AMQTopic("topic_control"); _control = new AMQTopic("topictest.control"); } else { - _topic = session.createTopic("topictest.messages"); + _topic = session.createTopic("topic_control"); _control = session.createTopic("topictest.control"); } + _payload = new byte[size]; - for(int i = 0; i < size; i++) + for (int i = 0; i < size; i++) { _payload[i] = (byte) DATA[i % DATA.length]; } } + private static boolean checkText(Message m, String s) + { + try + { + return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return false; + } + } + Topic getTopic() { return _topic; @@ -72,6 +86,7 @@ class MessageFactory { BytesMessage msg = _session.createBytesMessage(); msg.writeBytes(_payload); + return msg; } @@ -109,6 +124,7 @@ class MessageFactory catch (JMSException e) { e.printStackTrace(System.out); + return e.toString(); } } @@ -137,17 +153,4 @@ class MessageFactory { return _session.createProducer(_control); } - - private static boolean checkText(Message m, String s) - { - try - { - return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - return false; - } - } } |