summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java29
1 files changed, 29 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cf1d7cedeb..efcbfd5532 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.BindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final RejectBehaviour _rejectBehaviour;
+
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
@@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+ if (destination.getRejectBehaviour() != null)
+ {
+ _rejectBehaviour = destination.getRejectBehaviour();
+ }
+ else
+ {
+ ConnectionURL connectionURL = connection.getConnectionURL();
+ String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR);
+ if (rejectBehaviour != null)
+ {
+ _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase());
+ }
+ else
+ {
+ // use the default value for all connections, if not set
+ rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString());
+ _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase());
+ }
+ }
}
void sendCancel() throws AMQException, FailoverException
@@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
}
+
+ public RejectBehaviour getRejectBehaviour()
+ {
+ return _rejectBehaviour;
+ }
}