summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-07 11:04:22 +0000
committerKeith Wall <kwall@apache.org>2011-11-07 11:04:22 +0000
commita996985f58a8843104c57c8e2cde185bfc143480 (patch)
tree1ddf19b34781bb5f73387c284290c3733b50048b
parent36d4650990d99bdbc29609567e56d846839edc46 (diff)
downloadqpid-python-a996985f58a8843104c57c8e2cde185bfc143480.tar.gz
QPID-3536: 0-10 overrides JMS AcceptMode with a defaulted (not explicitly set) Link Reliability of UNRELIABLE
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1198701 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java61
4 files changed, 51 insertions, 36 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index c26fe98568..721ab6f302 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1166,22 +1166,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
int type = resolveAddressType(dest);
- if (type == AMQDestination.QUEUE_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.UNRELIABLE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
- {
- throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
- }
-
switch (type)
{
case AMQDestination.QUEUE_TYPE:
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 71780f5714..0e9c81f2f6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -204,7 +204,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
- // TODO Use a tag for fiding out if message filtering is done here or by the broker.
+ // TODO Use a tag for finding out if message filtering is done here or by the broker.
try
{
if (_messageSelectorFilter != null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 5f97d625b4..c73d800b14 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,18 +20,14 @@
*/
package org.apache.qpid.client.messaging.address;
-import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-
public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
- public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
+ public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
protected String name;
protected String _filter;
@@ -42,7 +38,7 @@ public class Link
protected int _producerCapacity = 0;
protected Node node;
protected Subscription subscription;
- protected Reliability reliability = UNSPECIFIED;
+ protected Reliability reliability = Reliability.AT_LEAST_ONCE;
public Reliability getReliability()
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index b70b2f90e4..feae7c9573 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1070,19 +1070,6 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
}
-
- String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
- try
- {
- AMQAnyDestination dest = new AMQAnyDestination(addr4);
- Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons = ssn.createConsumer(dest);
- fail("An exception should be thrown indicating it's an unsupported combination");
- }
- catch(Exception e)
- {
- assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
- }
}
private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
@@ -1286,4 +1273,52 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
Message m = consumer.receive(RECEIVE_TIMEOUT);
assertNull("Unexpected message received", m);
}
+
+ /**
+ * Tests that a client using a session in {@link Session#CLIENT_ACKNOWLEDGE} can correctly
+ * recover a session and re-receive the same message.
+ */
+ public void testTopicRereceiveAfterRecover() throws Exception
+ {
+ final Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+ final MessageProducer prod = jmsSession.createProducer(topic);
+ final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic);
+ final Message sentMessage = jmsSession.createTextMessage("Hello");
+
+ prod.send(sentMessage);
+ Message receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be received by consumer", receivedMessage);
+
+ jmsSession.recover();
+ receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be re-received by consumer after recover", receivedMessage);
+ receivedMessage.acknowledge();
+ }
+
+ /**
+ * Tests that a client using a session in {@link Session#SESSION_TRANSACTED} can correctly
+ * rollback a session and re-receive the same message.
+ */
+ public void testTopicRereceiveAfterRollback() throws Exception
+ {
+ final Session jmsSession = _connection.createSession(true,Session.SESSION_TRANSACTED);
+ final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+ final MessageProducer prod = jmsSession.createProducer(topic);
+ final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic);
+ final Message sentMessage = jmsSession.createTextMessage("Hello");
+
+ prod.send(sentMessage);
+ jmsSession.commit();
+
+ Message receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be received by consumer", receivedMessage);
+
+ jmsSession.rollback();
+ receivedMessage = consForTopic1.receive(1000);
+ assertNotNull("message should be re-received by consumer after rollback", receivedMessage);
+ jmsSession.commit();
+ }
}