diff options
4 files changed, 51 insertions, 36 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index c26fe98568..721ab6f302 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1166,22 +1166,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic int type = resolveAddressType(dest); - if (type == AMQDestination.QUEUE_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.AT_LEAST_ONCE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.UNRELIABLE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE) - { - throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics"); - } - switch (type) { case AMQDestination.QUEUE_TYPE: diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 71780f5714..0e9c81f2f6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -204,7 +204,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException { boolean messageOk = true; - // TODO Use a tag for fiding out if message filtering is done here or by the broker. + // TODO Use a tag for finding out if message filtering is done here or by the broker. try { if (_messageSelectorFilter != null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 5f97d625b4..c73d800b14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,18 +20,14 @@ */ package org.apache.qpid.client.messaging.address; -import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED; - import java.util.HashMap; import java.util.Map; -import org.apache.qpid.client.messaging.address.Node.QueueNode; - public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } - public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED } + public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } protected String name; protected String _filter; @@ -42,7 +38,7 @@ public class Link protected int _producerCapacity = 0; protected Node node; protected Subscription subscription; - protected Reliability reliability = UNSPECIFIED; + protected Reliability reliability = Reliability.AT_LEAST_ONCE; public Reliability getReliability() { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index b70b2f90e4..feae7c9573 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1070,19 +1070,6 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); } - - String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; - try - { - AMQAnyDestination dest = new AMQAnyDestination(addr4); - Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons = ssn.createConsumer(dest); - fail("An exception should be thrown indicating it's an unsupported combination"); - } - catch(Exception e) - { - assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); - } } private void acceptModeTest(String address, int expectedQueueDepth) throws Exception @@ -1286,4 +1273,52 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Message m = consumer.receive(RECEIVE_TIMEOUT); assertNull("Unexpected message received", m); } + + /** + * Tests that a client using a session in {@link Session#CLIENT_ACKNOWLEDGE} can correctly + * recover a session and re-receive the same message. + */ + public void testTopicRereceiveAfterRecover() throws Exception + { + final Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}"); + + final MessageProducer prod = jmsSession.createProducer(topic); + final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic); + final Message sentMessage = jmsSession.createTextMessage("Hello"); + + prod.send(sentMessage); + Message receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be received by consumer", receivedMessage); + + jmsSession.recover(); + receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be re-received by consumer after recover", receivedMessage); + receivedMessage.acknowledge(); + } + + /** + * Tests that a client using a session in {@link Session#SESSION_TRANSACTED} can correctly + * rollback a session and re-receive the same message. + */ + public void testTopicRereceiveAfterRollback() throws Exception + { + final Session jmsSession = _connection.createSession(true,Session.SESSION_TRANSACTED); + final Destination topic = jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}"); + + final MessageProducer prod = jmsSession.createProducer(topic); + final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic); + final Message sentMessage = jmsSession.createTextMessage("Hello"); + + prod.send(sentMessage); + jmsSession.commit(); + + Message receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be received by consumer", receivedMessage); + + jmsSession.rollback(); + receivedMessage = consForTopic1.receive(1000); + assertNotNull("message should be re-received by consumer after rollback", receivedMessage); + jmsSession.commit(); + } } |