summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-16 18:21:48 +0000
committerRobert Greig <rgreig@apache.org>2006-12-16 18:21:48 +0000
commitbcf477d8c2a3e30e5ec109d7af5861d7f344eff1 (patch)
treee6403aad40c260e0635e8ed23a14ae17115aed35
parentfc20045970761b1057a7f8fe5ef9f40bfc9d0240 (diff)
downloadqpid-python-bcf477d8c2a3e30e5ec109d7af5861d7f344eff1.tar.gz
QPID-206 : Fix byte buffer reseting in AbstractJMSMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487849 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java24
5 files changed, 46 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 4dc6b5f914..097e6bec62 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -76,6 +76,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+ private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -107,6 +109,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
+
/**
* Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
* need to be attached to a queue
@@ -1205,7 +1208,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
_subscriptions.put(name,subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
return subscriber;
}
@@ -1236,6 +1241,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
_subscriptions.put(name,subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
return subscriber;
}
@@ -1280,6 +1286,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// send a queue.delete for the subscription
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
_subscriptions.remove(name);
+ _reverseSubscriptionMap.remove(subscriber);
}
else
{
@@ -1443,6 +1450,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void deregisterConsumer(BasicMessageConsumer consumer)
{
_consumers.remove(consumer.getConsumerTag());
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if(subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
+
Destination dest = consumer.getDestination();
synchronized(dest)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 014c7c3311..dbc7b72813 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -35,10 +35,10 @@ import javax.jms.TopicSubscriber;
class TopicSubscriberAdaptor implements TopicSubscriber
{
private final Topic _topic;
- private final MessageConsumer _consumer;
+ private final BasicMessageConsumer _consumer;
private final boolean _noLocal;
- TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+ TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
{
_topic = topic;
_consumer = consumer;
@@ -119,4 +119,10 @@ class TopicSubscriberAdaptor implements TopicSubscriber
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
+
+ BasicMessageConsumer getMessageConsumer()
+ {
+ return _consumer;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index ade330b63b..8982d5d2e1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -479,14 +479,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
// position beyond the start
if (_data != null)
{
- if (!_readableMessage)
- {
- _data.flip();
- }
- else
- {
- _data.rewind();
- }
+ reset();
}
return _data;
}
@@ -525,7 +518,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
return !_readableMessage;
}
- public void reset() throws JMSException
+ public void reset()
{
if (_readableMessage)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index 04ad15da7a..903f6a9da9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -21,11 +21,8 @@
package org.apache.qpid.test.unit.basic;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.testutil.VMBrokerSetup;
import org.apache.log4j.Logger;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 026ef2e614..14ceaa75f1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -200,7 +200,29 @@ public class TopicSessionTest extends TestCase
con.close();
}
- public void testTempoaryTopic() throws Exception
+ public void testSendingSameMessage() throws Exception
+ {
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic topic = session.createTemporaryTopic();
+ assertNotNull(topic);
+ TopicPublisher producer = session.createPublisher(topic);
+ MessageConsumer consumer = session.createConsumer(topic);
+ conn.start();
+ TextMessage sentMessage = session.createTextMessage("Test Message");
+ producer.send(sentMessage);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
+ assertNotNull(receivedMessage);
+ assertEquals(sentMessage.getText(),receivedMessage.getText());
+ producer.send(sentMessage);
+ receivedMessage = (TextMessage) consumer.receive(2000);
+ assertNotNull(receivedMessage);
+ assertEquals(sentMessage.getText(),receivedMessage.getText());
+
+
+ }
+
+ public void testTemporaryTopic() throws Exception
{
AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);