diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-23 13:00:05 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-23 13:00:05 +0000 |
commit | e78cd3d73d0ce9407ead5a9f91f9cb771affac52 (patch) | |
tree | f6881bcc5816e324d3c1f0c0c6e60a26f1742271 | |
parent | 822f0622694b341b44845e798c058990a9f69b8d (diff) | |
download | qpid-python-e78cd3d73d0ce9407ead5a9f91f9cb771affac52.tar.gz |
QPID-3678 : [Java Client] Add support for setting link capacity to zero in ADDR addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1626995 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 124 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 8f91a7db08..e0d8ac3702 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _syncReceive.set(true); } - if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty()) + if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty()) { messageFlow(); } @@ -536,7 +536,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private long evaluateCapacity(AMQDestination destination) { long capacity = 0; - if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0) + if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0) { capacity = destination.getLink().getConsumerCapacity(); } @@ -547,4 +547,75 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } + @Override + public Message receive(final long l) throws JMSException + { + long capacity = getCapacity(); + try + { + AMQSession_0_10 session = (AMQSession_0_10) getSession(); + + if (capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + + session.sync(); + + } + + Message message = super.receive(l); + + if (message == null && capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 0, + Option.UNRELIABLE); + session.sync(); + + message = super.receiveNoWait(); + } + return message; + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + @Override + public Message receiveNoWait() throws JMSException + { + long capacity = getCapacity(); + try + { + AMQSession_0_10 session = (AMQSession_0_10) getSession(); + + if (capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + + session.sync(); + } + Message message = super.receiveNoWait(); + if (message == null && capacity == 0 && getMessageListener() == null) + { + session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 0, + Option.UNRELIABLE); + session.sync(); + + message = super.receiveNoWait(); + } + return message; + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 116fd11942..81ccaee1f3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -242,22 +242,17 @@ public class AddressHelper if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { - MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) _address.getOptions().get(LINK)) - .get(CAPACITY)); - link - .setConsumerCapacity(capacityProps - .getInt(CAPACITY_SOURCE) == null ? 0 - : capacityProps.getInt(CAPACITY_SOURCE)); - link - .setProducerCapacity(capacityProps - .getInt(CAPACITY_TARGET) == null ? 0 - : capacityProps.getInt(CAPACITY_TARGET)); + MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY)); + + Integer sourceCapacity = capacityProps.getInt(CAPACITY_SOURCE); + link.setConsumerCapacity(sourceCapacity == null ? -1 : sourceCapacity); + + Integer targetCapacity = capacityProps.getInt(CAPACITY_TARGET); + link.setProducerCapacity(targetCapacity == null ? -1 : targetCapacity); } else { - int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess - .getInt(CAPACITY); + int cap = _linkPropAccess.getInt(CAPACITY) == null ? -1 : _linkPropAccess.getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 7e9cb3072a..b40abf3f98 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -38,8 +38,8 @@ public class Link private FilterType _filterType = FilterType.SUBJECT; private boolean _isNoLocal; private boolean _isDurable; - private int _consumerCapacity = 0; - private int _producerCapacity = 0; + private int _consumerCapacity = -1; + private int _producerCapacity = -1; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; private List<Binding> _bindings = Collections.emptyList(); diff --git a/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 1161e8dd59..0458313a4c 100644 --- a/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -454,6 +454,45 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase checkQueueForBindings(jmsSession,dest2,headersBinding); } + public void testZeroCapacityForSynchronousReceive() throws Exception + { + Session session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String addressString = "ADDR:my-queue; {create: always, link:{capacity: 0}}"; + Queue session1queue = session1.createQueue(addressString); + Queue session2queue = session1.createQueue(addressString); + MessageConsumer consumer1 = session1.createConsumer(session1queue); + MessageConsumer consumer1withSelector = session1.createConsumer(session1queue, "key1 = 1"); + MessageConsumer consumer2withSelector = session2.createConsumer(session2queue, "key2 = 2"); + + _connection.start(); + + MessageProducer producer = session1.createProducer(session1queue); + + Message m = session1.createMessage(); + m.setIntProperty("key1", 1); + m.setIntProperty("key2", 2); + producer.send(m); + + m = session1.createMessage(); + m.setIntProperty("key1", 1); + producer.send(m); + + m = session1.createMessage(); + producer.send(m); + + m = session1.createMessage(); + m.setIntProperty("key2", 2); + producer.send(m); + + assertNotNull("First message from queue should be received",consumer1withSelector.receive(1000l)); + assertNotNull("Last message on queue should be received", consumer2withSelector.receive(1000l)); + assertNotNull("Second message from queue should be received", consumer1.receive(1000l)); + assertNull("Only message remaining shouldn't match selector",consumer1withSelector.receive(500l)); + assertNotNull("Should have been one message remaining on queue",consumer1.receive(1000l)); + assertNull("No messages should be remaining on queue",consumer1.receive(500l)); + } + /** * Test goal: Verifies the capacity property in address string is handled properly. * Test strategy: diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index e89753bef3..df8cb1bab9 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -30,6 +30,8 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMo //QPID-3392: the Java broker does not yet implement exchange creation arguments org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs +//QPID-3678: zero capacity not supported in 0-9-1 +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testZeroCapacityForSynchronousReceive //QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic // you want a topic behaviour. The 0-10 client thinks you must want a queue. org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10 |