diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-16 18:21:48 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-16 18:21:48 +0000 |
commit | bcf477d8c2a3e30e5ec109d7af5861d7f344eff1 (patch) | |
tree | e6403aad40c260e0635e8ed23a14ae17115aed35 | |
parent | fc20045970761b1057a7f8fe5ef9f40bfc9d0240 (diff) | |
download | qpid-python-bcf477d8c2a3e30e5ec109d7af5861d7f344eff1.tar.gz |
QPID-206 : Fix byte buffer reseting in AbstractJMSMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487849 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 46 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4dc6b5f914..097e6bec62 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -76,6 +76,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); + private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = + new ConcurrentHashMap<BasicMessageConsumer, String>(); /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait @@ -107,6 +109,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); + /** * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not * need to be attached to a queue @@ -1205,7 +1208,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + _subscriptions.put(name,subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); return subscriber; } @@ -1236,6 +1241,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); _subscriptions.put(name,subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); return subscriber; } @@ -1280,6 +1286,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // send a queue.delete for the subscription deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); _subscriptions.remove(name); + _reverseSubscriptionMap.remove(subscriber); } else { @@ -1443,6 +1450,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void deregisterConsumer(BasicMessageConsumer consumer) { _consumers.remove(consumer.getConsumerTag()); + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if(subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } + Destination dest = consumer.getDestination(); synchronized(dest) { diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index 014c7c3311..dbc7b72813 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -35,10 +35,10 @@ import javax.jms.TopicSubscriber; class TopicSubscriberAdaptor implements TopicSubscriber { private final Topic _topic; - private final MessageConsumer _consumer; + private final BasicMessageConsumer _consumer; private final boolean _noLocal; - TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal) + TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal) { _topic = topic; _consumer = consumer; @@ -119,4 +119,10 @@ class TopicSubscriberAdaptor implements TopicSubscriber throw new javax.jms.IllegalStateException("Invalid Session"); } } + + BasicMessageConsumer getMessageConsumer() + { + return _consumer; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index ade330b63b..8982d5d2e1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -479,14 +479,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms // position beyond the start if (_data != null) { - if (!_readableMessage) - { - _data.flip(); - } - else - { - _data.rewind(); - } + reset(); } return _data; } @@ -525,7 +518,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms return !_readableMessage; } - public void reset() throws JMSException + public void reset() { if (_readableMessage) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 04ad15da7a..903f6a9da9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -21,11 +21,8 @@ package org.apache.qpid.test.unit.basic; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.testutil.VMBrokerSetup; import org.apache.log4j.Logger; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 026ef2e614..14ceaa75f1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -200,7 +200,29 @@ public class TopicSessionTest extends TestCase con.close(); } - public void testTempoaryTopic() throws Exception + public void testSendingSameMessage() throws Exception + { + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + TopicPublisher producer = session.createPublisher(topic); + MessageConsumer consumer = session.createConsumer(topic); + conn.start(); + TextMessage sentMessage = session.createTextMessage("Test Message"); + producer.send(sentMessage); + TextMessage receivedMessage = (TextMessage) consumer.receive(2000); + assertNotNull(receivedMessage); + assertEquals(sentMessage.getText(),receivedMessage.getText()); + producer.send(sentMessage); + receivedMessage = (TextMessage) consumer.receive(2000); + assertNotNull(receivedMessage); + assertEquals(sentMessage.getText(),receivedMessage.getText()); + + + } + + public void testTemporaryTopic() throws Exception { AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); |