summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-09-23 13:00:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-09-23 13:00:05 +0000
commite78cd3d73d0ce9407ead5a9f91f9cb771affac52 (patch)
treef6881bcc5816e324d3c1f0c0c6e60a26f1742271
parent822f0622694b341b44845e798c058990a9f69b8d (diff)
downloadqpid-python-e78cd3d73d0ce9407ead5a9f91f9cb771affac52.tar.gz
QPID-3678 : [Java Client] Add support for setting link capacity to zero in ADDR addresses
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1626995 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java75
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java39
-rw-r--r--java/test-profiles/JavaPre010Excludes2
5 files changed, 124 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 8f91a7db08..e0d8ac3702 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
+ if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
@@ -536,7 +536,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private long evaluateCapacity(AMQDestination destination)
{
long capacity = 0;
- if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0)
{
capacity = destination.getLink().getConsumerCapacity();
}
@@ -547,4 +547,75 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
+ @Override
+ public Message receive(final long l) throws JMSException
+ {
+ long capacity = getCapacity();
+ try
+ {
+ AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+ if (capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+
+ session.sync();
+
+ }
+
+ Message message = super.receive(l);
+
+ if (message == null && capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 0,
+ Option.UNRELIABLE);
+ session.sync();
+
+ message = super.receiveNoWait();
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException
+ {
+ long capacity = getCapacity();
+ try
+ {
+ AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+ if (capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+
+ session.sync();
+ }
+ Message message = super.receiveNoWait();
+ if (message == null && capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 0,
+ Option.UNRELIABLE);
+ session.sync();
+
+ message = super.receiveNoWait();
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 116fd11942..81ccaee1f3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -242,22 +242,17 @@ public class AddressHelper
if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
- MapAccessor capacityProps = new MapAccessor(
- (Map) ((Map) _address.getOptions().get(LINK))
- .get(CAPACITY));
- link
- .setConsumerCapacity(capacityProps
- .getInt(CAPACITY_SOURCE) == null ? 0
- : capacityProps.getInt(CAPACITY_SOURCE));
- link
- .setProducerCapacity(capacityProps
- .getInt(CAPACITY_TARGET) == null ? 0
- : capacityProps.getInt(CAPACITY_TARGET));
+ MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));
+
+ Integer sourceCapacity = capacityProps.getInt(CAPACITY_SOURCE);
+ link.setConsumerCapacity(sourceCapacity == null ? -1 : sourceCapacity);
+
+ Integer targetCapacity = capacityProps.getInt(CAPACITY_TARGET);
+ link.setProducerCapacity(targetCapacity == null ? -1 : targetCapacity);
}
else
{
- int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
- .getInt(CAPACITY);
+ int cap = _linkPropAccess.getInt(CAPACITY) == null ? -1 : _linkPropAccess.getInt(CAPACITY);
link.setConsumerCapacity(cap);
link.setProducerCapacity(cap);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 7e9cb3072a..b40abf3f98 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -38,8 +38,8 @@ public class Link
private FilterType _filterType = FilterType.SUBJECT;
private boolean _isNoLocal;
private boolean _isDurable;
- private int _consumerCapacity = 0;
- private int _producerCapacity = 0;
+ private int _consumerCapacity = -1;
+ private int _producerCapacity = -1;
private Subscription subscription;
private Reliability reliability = Reliability.AT_LEAST_ONCE;
private List<Binding> _bindings = Collections.emptyList();
diff --git a/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 1161e8dd59..0458313a4c 100644
--- a/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -454,6 +454,45 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
checkQueueForBindings(jmsSession,dest2,headersBinding);
}
+ public void testZeroCapacityForSynchronousReceive() throws Exception
+ {
+ Session session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String addressString = "ADDR:my-queue; {create: always, link:{capacity: 0}}";
+ Queue session1queue = session1.createQueue(addressString);
+ Queue session2queue = session1.createQueue(addressString);
+ MessageConsumer consumer1 = session1.createConsumer(session1queue);
+ MessageConsumer consumer1withSelector = session1.createConsumer(session1queue, "key1 = 1");
+ MessageConsumer consumer2withSelector = session2.createConsumer(session2queue, "key2 = 2");
+
+ _connection.start();
+
+ MessageProducer producer = session1.createProducer(session1queue);
+
+ Message m = session1.createMessage();
+ m.setIntProperty("key1", 1);
+ m.setIntProperty("key2", 2);
+ producer.send(m);
+
+ m = session1.createMessage();
+ m.setIntProperty("key1", 1);
+ producer.send(m);
+
+ m = session1.createMessage();
+ producer.send(m);
+
+ m = session1.createMessage();
+ m.setIntProperty("key2", 2);
+ producer.send(m);
+
+ assertNotNull("First message from queue should be received",consumer1withSelector.receive(1000l));
+ assertNotNull("Last message on queue should be received", consumer2withSelector.receive(1000l));
+ assertNotNull("Second message from queue should be received", consumer1.receive(1000l));
+ assertNull("Only message remaining shouldn't match selector",consumer1withSelector.receive(500l));
+ assertNotNull("Should have been one message remaining on queue",consumer1.receive(1000l));
+ assertNull("No messages should be remaining on queue",consumer1.receive(500l));
+ }
+
/**
* Test goal: Verifies the capacity property in address string is handled properly.
* Test strategy:
diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes
index e89753bef3..df8cb1bab9 100644
--- a/java/test-profiles/JavaPre010Excludes
+++ b/java/test-profiles/JavaPre010Excludes
@@ -30,6 +30,8 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMo
//QPID-3392: the Java broker does not yet implement exchange creation arguments
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs
+//QPID-3678: zero capacity not supported in 0-9-1
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testZeroCapacityForSynchronousReceive
//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic
// you want a topic behaviour. The 0-10 client thinks you must want a queue.
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10