summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-15 19:21:46 +0000
committerRobert Greig <rgreig@apache.org>2006-12-15 19:21:46 +0000
commit5f5eea7f3b85ec60403ec8079980f167100b9251 (patch)
tree3b9e2617d06cafb66422d241ea60029bc4bdb4fb /java
parent5d970981ae65579853676572cce96673dd66ffe2 (diff)
downloadqpid-python-5f5eea7f3b85ec60403ec8079980f167100b9251.tar.gz
QPID-194 Patch supplied by Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java42
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
2 files changed, 26 insertions, 18 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d149683646..cfc79cc70c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -165,6 +165,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
_logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Consumers that exist: " + _consumers);
+ _logger.warn("Session hashcode: " + System.identityHashCode(this));
}
else
{
@@ -958,26 +960,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName
* @return the consumer tag generated by the broker
*/
- private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
- boolean noLocal, boolean exclusive, int acknowledgeMode, boolean nowait) throws AMQException
+ private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, noLocal,
- acknowledgeMode == Session.NO_ACKNOWLEDGE,
- exclusive, nowait);
- if (nowait)
+ consumer.setConsumerTag(tag);
+ // we must register the consumer in the map before we actually start listening
+ _consumers.put(tag, consumer);
+ try
{
- protocolHandler.writeFrame(jmsConsume);
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
+ queueName, tag, consumer.isNoLocal(),
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+ consumer.isExclusive(), nowait);
+ if (nowait)
+ {
+ protocolHandler.writeFrame(jmsConsume);
+ }
+ else
+ {
+ protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ }
}
- else
+ catch (AMQException e)
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ // clean-up the map in the event of an error
+ _consumers.remove(tag);
+ throw e;
}
- return tag;
}
public Queue createQueue(String queueName) throws JMSException
@@ -1354,12 +1367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(),
- consumer.getPrefetchLow(), consumer.isNoLocal(), consumer.isExclusive(),
- consumer.getAcknowledgeMode(), nowait);
-
- consumer.setConsumerTag(consumerTag);
- _consumers.put(consumerTag, consumer);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 2448e14542..31a2e6bd82 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -280,7 +280,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
checkPreConditions();
-
+
acquireReceiving();
try