summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-20 13:22:27 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-20 13:22:27 +0000
commitdd595081e607f6596d01fc5def90025dc5c650b4 (patch)
tree9483cdc73a45fb0910971039630b45c6a08a9133
parent962e235469aeb7f1174b72e5ff4de0a9d0bfd9a7 (diff)
downloadqpid-python-dd595081e607f6596d01fc5def90025dc5c650b4.tar.gz
QPID-223
Applied patch from Rupert Smith git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489082 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java1
6 files changed, 59 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 4c57f28fef..bf61550cdc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
@@ -31,7 +32,6 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -72,15 +72,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -319,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -335,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -351,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public javax.jms.Message createMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -367,7 +367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -383,7 +383,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -401,7 +401,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -418,7 +418,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -435,7 +435,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -505,7 +505,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -570,7 +570,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e)
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -722,11 +722,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void acknowledge() throws JMSException
{
- if(isClosed())
+ if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- for(BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.acknowledge();
}
@@ -1078,10 +1078,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String tag = Integer.toString(_nextTag++);
FieldTable arguments = FieldTableFactory.newFieldTable();
- if (messageSelector != null)
+ if (messageSelector != null && !messageSelector.equals(""))
{
- //fixme move literal value to a common class.
- arguments.put("x-filter-jms-selector", messageSelector);
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
consumer.setConsumerTag(tag);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 572739d0b1..40d8b28411 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -47,7 +47,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
protected ByteBuffer _data;
private boolean _readableProperties = false;
- private boolean _readableMessage = false;
+ protected boolean _readableMessage = false;
+ protected boolean _changedData;
private Destination _destination;
private BasicMessageConsumer _consumer;
@@ -61,6 +62,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
_readableProperties = false;
_readableMessage = (data != null);
+ _changedData = (data == null);
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -522,16 +524,16 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
return !_readableMessage;
}
- public void reset()
+ public void reset()
{
- if (_readableMessage)
+ if (!_changedData)
{
_data.rewind();
}
else
{
_data.flip();
- _readableMessage = true;
+ _changedData = false;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index f5c9f7111a..d769300c69 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -59,6 +59,12 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -226,48 +232,56 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
public void writeBoolean(boolean b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b ? (byte) 1 : (byte) 0);
}
public void writeByte(byte b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b);
}
public void writeShort(short i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putShort(i);
}
public void writeChar(char c) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putChar(c);
}
public void writeInt(int i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putInt(i);
}
public void writeLong(long l) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putLong(l);
}
public void writeFloat(float v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putFloat(v);
}
public void writeDouble(double v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putDouble(v);
}
@@ -281,7 +295,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
_data.putShort((short)encodedString.limit());
_data.put(encodedString);
-
+ _changedData = true;
//_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
//_data.put((byte)0);
@@ -298,12 +312,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
{
checkWritable();
_data.put(bytes);
+ _changedData = true;
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
checkWritable();
_data.put(bytes, offset, length);
+ _changedData = true;
}
public void writeObject(Object object) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 4fb070d2ff..35c5377f14 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -112,7 +112,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -123,18 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
try
{
- _data.rewind();
+ _data.rewind();
in = new ObjectInputStream(_data.asInputStream());
return (Serializable) in.readObject();
}
catch (IOException e)
- {
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ {
+ e.printStackTrace();
+ throw new MessageFormatException("Could not deserialize message: " + e);
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
+ e.printStackTrace();
throw new MessageFormatException("Could not deserialize message: " + e);
}
finally
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index c2dfdc1b65..6709ff802d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -86,6 +86,12 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -103,6 +109,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
{
checkWritable();
_data.put(type);
+ _changedData = true;
}
public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
{
_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must write the null terminator ourselves
- _data.put((byte)0);
+ _data.put((byte) 0);
}
catch (CharacterCodingException e)
{
@@ -706,7 +713,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeBytes(byte[] bytes) throws JMSException
{
- writeBytes(bytes, 0, bytes == null?0:bytes.length);
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index 76f8a1c32f..d8394b0489 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -117,6 +117,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
_data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
}
+ _changedData=true;
}
_decodedValue = text;
}