diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-12-20 13:22:27 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-12-20 13:22:27 +0000 |
commit | dd595081e607f6596d01fc5def90025dc5c650b4 (patch) | |
tree | 9483cdc73a45fb0910971039630b45c6a08a9133 | |
parent | 962e235469aeb7f1174b72e5ff4de0a9d0bfd9a7 (diff) | |
download | qpid-python-dd595081e607f6596d01fc5def90025dc5c650b4.tar.gz |
QPID-223
Applied patch from Rupert Smith
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489082 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 59 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4c57f28fef..bf61550cdc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; @@ -31,7 +32,6 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQMethodEvent; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -72,15 +72,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled - * correctly. Note this only keeps a record of subscriptions which have been created - * in the current instance. It does not remember subscriptions between executions of the - * client + * Used to reference durable subscribers so they requests for unsubscribe can be handled + * correctly. Note this only keeps a record of subscriptions which have been created + * in the current instance. It does not remember subscriptions between executions of the + * client */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<BasicMessageConsumer, String>(); /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait @@ -319,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -335,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -351,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -367,7 +367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -383,7 +383,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -401,7 +401,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -418,7 +418,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -435,7 +435,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -505,7 +505,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -570,7 +570,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -722,11 +722,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void acknowledge() throws JMSException { - if(isClosed()) + if (isClosed()) { throw new IllegalStateException("Session is already closed"); } - for(BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer consumer : _consumers.values()) { consumer.acknowledge(); } @@ -1078,10 +1078,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String tag = Integer.toString(_nextTag++); FieldTable arguments = FieldTableFactory.newFieldTable(); - if (messageSelector != null) + if (messageSelector != null && !messageSelector.equals("")) { - //fixme move literal value to a common class. - arguments.put("x-filter-jms-selector", messageSelector); + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); } consumer.setConsumerTag(tag); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 572739d0b1..40d8b28411 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -47,7 +47,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected ByteBuffer _data; private boolean _readableProperties = false; - private boolean _readableMessage = false; + protected boolean _readableMessage = false; + protected boolean _changedData; private Destination _destination; private BasicMessageConsumer _consumer; @@ -61,6 +62,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } _readableProperties = false; _readableMessage = (data != null); + _changedData = (data == null); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException @@ -522,16 +524,16 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach return !_readableMessage; } - public void reset() + public void reset() { - if (_readableMessage) + if (!_changedData) { _data.rewind(); } else { _data.flip(); - _readableMessage = true; + _changedData = false; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index f5c9f7111a..d769300c69 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -59,6 +59,12 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(messageNbr, contentHeader, data); } + public void reset() + { + super.reset(); + _readableMessage = true; + } + public String getMimeType() { return MIME_TYPE; @@ -226,48 +232,56 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag public void writeBoolean(boolean b) throws JMSException { checkWritable(); + _changedData = true; _data.put(b ? (byte) 1 : (byte) 0); } public void writeByte(byte b) throws JMSException { checkWritable(); + _changedData = true; _data.put(b); } public void writeShort(short i) throws JMSException { checkWritable(); + _changedData = true; _data.putShort(i); } public void writeChar(char c) throws JMSException { checkWritable(); + _changedData = true; _data.putChar(c); } public void writeInt(int i) throws JMSException { checkWritable(); + _changedData = true; _data.putInt(i); } public void writeLong(long l) throws JMSException { checkWritable(); + _changedData = true; _data.putLong(l); } public void writeFloat(float v) throws JMSException { checkWritable(); + _changedData = true; _data.putFloat(v); } public void writeDouble(double v) throws JMSException { checkWritable(); + _changedData = true; _data.putDouble(v); } @@ -281,7 +295,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag _data.putShort((short)encodedString.limit()); _data.put(encodedString); - + _changedData = true; //_data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must add the null terminator manually //_data.put((byte)0); @@ -298,12 +312,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag { checkWritable(); _data.put(bytes); + _changedData = true; } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { checkWritable(); _data.put(bytes, offset, length); + _changedData = true; } public void writeObject(Object object) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 4fb070d2ff..35c5377f14 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -112,7 +112,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } } - + public Serializable getObject() throws JMSException { ObjectInputStream in = null; @@ -123,18 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag try { - _data.rewind(); + _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); return (Serializable) in.readObject(); } catch (IOException e) - { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + { + e.printStackTrace(); + throw new MessageFormatException("Could not deserialize message: " + e); } catch (ClassNotFoundException e) { - e.printStackTrace(); + e.printStackTrace(); throw new MessageFormatException("Could not deserialize message: " + e); } finally diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index c2dfdc1b65..6709ff802d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -86,6 +86,12 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess super(messageNbr, contentHeader, data); } + public void reset() + { + super.reset(); + _readableMessage = true; + } + public String getMimeType() { return MIME_TYPE; @@ -103,6 +109,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { checkWritable(); _data.put(type); + _changedData = true; } public boolean readBoolean() throws JMSException @@ -693,7 +700,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { _data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must write the null terminator ourselves - _data.put((byte)0); + _data.put((byte) 0); } catch (CharacterCodingException e) { @@ -706,7 +713,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBytes(byte[] bytes) throws JMSException { - writeBytes(bytes, 0, bytes == null?0:bytes.length); + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 76f8a1c32f..d8394b0489 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -117,6 +117,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); } + _changedData=true; } _decodedValue = text; } |