diff options
3 files changed, 82 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index cfb4b2053e..0ae282a12e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -54,7 +54,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me */ protected AMQConnection _connection; - private String _messageSelector; + protected String _messageSelector; private boolean _noLocal; @@ -658,6 +658,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } else { + // we should not be allowed to add a message is the + // consumer is closed _synchronousQueue.put(jmsMessage); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3a9fae1d91..be7e2363f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -38,7 +38,6 @@ import javax.jms.JMSException; import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; import java.util.Iterator; /** @@ -290,7 +289,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { - if (getMessageSelector() != null && !getMessageSelector().equals("")) + if (_messageSelector != null && !_messageSelector.equals("")) { messageOk = _filter.matches(message); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java new file mode 100644 index 0000000000..83d491baad --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpidity.nclient.impl; + +/** + * This class holds all the 0.10 client constants which value can be set + * through properties. + */ +public class Constants +{ + static + { + + String max="message_size_before_sync";// KB's + try + { + MAX_NOT_SYNC_DATA_LENGH=new Long(System.getProperties().getProperty(max, "200000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_SYNC_DATA_LENGH=200000000; + } + String flush="message_size_before_flush"; + try + { + MAX_NOT_FLUSH_DATA_LENGH=new Long(System.getProperties().getProperty(flush, "2000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_FLUSH_DATA_LENGH=20000000; + } + } + + /** + * The total message size in KBs that can be transferted before + * client and broker are synchronized. + * A sync will result in the client library releasing the sent messages + * from memory. (messages are kept + * in memory so client can reconnect to a broker in the event of a failure) + * <p> + * Property name: message_size_before_sync + * <p> + * Default value: 200000000 + */ + public static long MAX_NOT_SYNC_DATA_LENGH; + /** + * The total message size in KBs that can be transferted before + * messages are flushed. + * When a flush returns all messages have reached the broker. + * <p> + * Property name: message_size_before_flush + * <p> + * Default value: 200000000 + */ + public static long MAX_NOT_FLUSH_DATA_LENGH; + +} |