diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cf1d7cedeb..efcbfd5532 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -28,7 +28,10 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.BindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final RejectBehaviour _rejectBehaviour; + protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, @@ -55,6 +60,25 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + if (destination.getRejectBehaviour() != null) + { + _rejectBehaviour = destination.getRejectBehaviour(); + } + else + { + ConnectionURL connectionURL = connection.getConnectionURL(); + String rejectBehaviour = connectionURL.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR); + if (rejectBehaviour != null) + { + _rejectBehaviour = RejectBehaviour.valueOf(rejectBehaviour.toUpperCase()); + } + else + { + // use the default value for all connections, if not set + rejectBehaviour = System.getProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.NORMAL.toString()); + _rejectBehaviour = RejectBehaviour.valueOf( rejectBehaviour.toUpperCase()); + } + } } void sendCancel() throws AMQException, FailoverException @@ -89,4 +113,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { } + + public RejectBehaviour getRejectBehaviour() + { + return _rejectBehaviour; + } } |