diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2010-01-14 16:53:21 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2010-01-14 16:53:21 +0000 |
commit | 425c785ed930986ac0fb8dbbb9c0e22f6354b7ce (patch) | |
tree | cf1e0f4a62f81dba1b7bf97ba6c26fc768345722 | |
parent | 8799c99897559f7f27bcae67abc29187899f174e (diff) | |
download | qpid-python-425c785ed930986ac0fb8dbbb9c0e22f6354b7ce.tar.gz |
QPID-2340 : Fix ProducerFlowControlTest to call a synchronous operation between sends (merged from 0.5-dev)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@899296 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 19 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 43f6fd8ad2..8a4fbe5a9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1339,6 +1339,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic declareExchange(name, type, getProtocolHandler(), nowait); } + abstract public void sync() throws AMQException; + public int getAcknowledgeMode() throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2324d441cc..2346ab5626 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -922,6 +922,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return Serial.lt((int) currentMark, (int) deliveryTag); } + + public void sync() throws AMQException + { + _qpidSession.sync(); + } public AMQMessageDelegateFactory getMessageDelegateFactory() { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 862e23385a..9b84421612 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -584,5 +584,10 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B { return AMQMessageDelegateFactory.FACTORY_0_8; } + + public void sync() throws AMQException + { + declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); + } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index d139f8d8b4..aead8eda53 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -409,8 +409,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging consumer.receive(); //perform a synchronous op on the connection - ((AMQSession) consumerSession).declareExchange( - new AMQShortString("amq.direct"), new AMQShortString("direct"), false); + ((AMQSession) consumerSession).sync(); assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); @@ -436,12 +435,15 @@ public class ProducerFlowControlTest extends AbstractTestLogging producer.send(nextMessage(msg, producerSession)); _sentMessages.incrementAndGet(); + try { - Thread.sleep(sleepPeriod); + ((AMQSession)producerSession).sync(); } - catch (InterruptedException e) + catch (AMQException e) { + e.printStackTrace(); + throw new RuntimeException(e); } } } @@ -495,4 +497,4 @@ public class ProducerFlowControlTest extends AbstractTestLogging return _exception; } } -}
\ No newline at end of file +} |