From bda1d29e19486a9dd29291c5fdb7d9df4ed4f647 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 14 Dec 2006 21:55:51 +0000 Subject: QPID-193 Make createConsumer synchronous to avoid races in client code. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487368 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 25 ++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) (limited to 'java/client/src/main') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8e93b19eea..0cbf5bac60 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -870,7 +870,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - registerConsumer(consumer); + registerConsumer(consumer, false); } catch (AMQException e) { @@ -948,7 +948,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return the consumer tag generated by the broker */ private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, - boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException + boolean noLocal, boolean exclusive, int acknowledgeMode, boolean nowait) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag @@ -957,9 +957,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, queueName, tag, noLocal, acknowledgeMode == Session.NO_ACKNOWLEDGE, - exclusive, true); - - protocolHandler.writeFrame(jmsConsume); + exclusive, nowait); + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); + } + else + { + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + } return tag; } @@ -1263,7 +1269,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param consumer * @throws AMQException */ - void registerConsumer(BasicMessageConsumer consumer) throws AMQException + void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException { AMQDestination amqd = consumer.getDestination(); @@ -1275,8 +1281,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), - consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); + String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), + consumer.getPrefetchLow(), consumer.isNoLocal(), consumer.isExclusive(), + consumer.getAcknowledgeMode(), nowait); consumer.setConsumerTag(consumerTag); _consumers.put(consumerTag, consumer); @@ -1338,7 +1345,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); - registerConsumer(consumer); + registerConsumer(consumer, true); } } -- cgit v1.2.1