diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 42 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 2 |
2 files changed, 26 insertions, 18 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d149683646..cfc79cc70c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -165,6 +165,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring..."); + _logger.warn("Consumers that exist: " + _consumers); + _logger.warn("Session hashcode: " + System.identityHashCode(this)); } else { @@ -958,26 +960,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName * @return the consumer tag generated by the broker */ - private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, - boolean noLocal, boolean exclusive, int acknowledgeMode, boolean nowait) throws AMQException + private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, + boolean nowait) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, - queueName, tag, noLocal, - acknowledgeMode == Session.NO_ACKNOWLEDGE, - exclusive, nowait); - if (nowait) + consumer.setConsumerTag(tag); + // we must register the consumer in the map before we actually start listening + _consumers.put(tag, consumer); + try { - protocolHandler.writeFrame(jmsConsume); + AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, + queueName, tag, consumer.isNoLocal(), + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, + consumer.isExclusive(), nowait); + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); + } + else + { + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + } } - else + catch (AMQException e) { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + // clean-up the map in the event of an error + _consumers.remove(tag); + throw e; } - return tag; } public Queue createQueue(String queueName) throws JMSException @@ -1354,12 +1367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), - consumer.getPrefetchLow(), consumer.isNoLocal(), consumer.isExclusive(), - consumer.getAcknowledgeMode(), nowait); - - consumer.setConsumerTag(consumerTag); - _consumers.put(consumerTag, consumer); + consumeFromQueue(consumer, queueName, protocolHandler, nowait); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 2448e14542..31a2e6bd82 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -280,7 +280,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { checkPreConditions(); - + acquireReceiving(); try |