diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-14 21:55:51 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-14 21:55:51 +0000 |
| commit | bda1d29e19486a9dd29291c5fdb7d9df4ed4f647 (patch) | |
| tree | 5ebd6b8be1af977d8236c17116d5c0ba633113cb /java/client/src/main | |
| parent | 295cdabf2cea8feb127b094cc123326404551fa4 (diff) | |
| download | qpid-python-bda1d29e19486a9dd29291c5fdb7d9df4ed4f647.tar.gz | |
QPID-193 Make createConsumer synchronous to avoid races in client code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487368 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8e93b19eea..0cbf5bac60 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -870,7 +870,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - registerConsumer(consumer); + registerConsumer(consumer, false); } catch (AMQException e) { @@ -948,7 +948,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return the consumer tag generated by the broker */ private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, - boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException + boolean noLocal, boolean exclusive, int acknowledgeMode, boolean nowait) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag @@ -957,9 +957,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, queueName, tag, noLocal, acknowledgeMode == Session.NO_ACKNOWLEDGE, - exclusive, true); - - protocolHandler.writeFrame(jmsConsume); + exclusive, nowait); + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); + } + else + { + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + } return tag; } @@ -1263,7 +1269,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param consumer * @throws AMQException */ - void registerConsumer(BasicMessageConsumer consumer) throws AMQException + void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException { AMQDestination amqd = consumer.getDestination(); @@ -1275,8 +1281,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), - consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); + String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), + consumer.getPrefetchLow(), consumer.isNoLocal(), consumer.isExclusive(), + consumer.getAcknowledgeMode(), nowait); consumer.setConsumerTag(consumerTag); _consumers.put(consumerTag, consumer); @@ -1338,7 +1345,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); - registerConsumer(consumer); + registerConsumer(consumer, true); } } |
