diff options
author | Robert Greig <rgreig@apache.org> | 2006-10-20 21:19:00 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-10-20 21:19:00 +0000 |
commit | 67e13cee9877551206c3e1418155f5a7fc50d827 (patch) | |
tree | 2d98fee6175ae551e8f887c2ab93b1dc2c658bb7 | |
parent | e4240d67de8cc77c290fb40c3fa773ec5c910f49 (diff) | |
download | qpid-python-67e13cee9877551206c3e1418155f5a7fc50d827.tar.gz |
Merge from trunk up to revision 466241
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@466266 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 1805 insertions, 376 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml index 68d9691cc2..ef16c0c370 100644 --- a/java/broker/etc/log4j.xml +++ b/java/broker/etc/log4j.xml @@ -19,7 +19,7 @@ <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="FileAppender" class="org.apache.log4j.FileAppender"> - <param name="File" value="${QPID_WORK}/log/qpid.log"/> + <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/> <param name="Append" value="false"/> <layout class="org.apache.log4j.PatternLayout"> diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 78889e69b5..055af16159 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -23,6 +23,10 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.ack.TxAck; +import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; @@ -31,7 +35,6 @@ import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; -import org.apache.qpid.server.util.OrderedMapHelper; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -93,36 +96,17 @@ public class AMQChannel private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); + private long _lastDeliveryTag; + private final AtomicBoolean _suspended = new AtomicBoolean(false); private final MessageRouter _exchanges; private final TxnBuffer _txnBuffer; - private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); - - public static class UnacknowledgedMessage - { - public final AMQMessage message; - public final String consumerTag; - public AMQQueue queue; + private TxAck ackOp; - public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag) - { - this.queue = queue; - this.message = message; - this.consumerTag = consumerTag; - } - - private void discard() throws AMQException - { - if (queue != null) - { - message.dequeue(queue); - } - message.decrementReference(); - } - } + private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException @@ -352,7 +336,8 @@ public class AMQChannel { synchronized(_unacknowledgedMessageMapLock) { - _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag)); + _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); + _lastDeliveryTag = deliveryTag; checkSuspension(); } } @@ -444,13 +429,32 @@ public class AMQChannel { if (_transactional) { - try + //check that the tag exists to give early failure + if(!multiple || deliveryTag > 0) { - _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple))); + checkAck(deliveryTag); } - catch(NoSuchElementException e) + //we use a single txn op for all acks and update this op + //as new acks come in. If this is the first ack in the txn + //we will need to create and enlist the op. + if(ackOp == null) { - throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag); + ackOp = new TxAck(new AckMap()); + _txnBuffer.enlist(ackOp); + } + //update the op to include this ack request + if(multiple && deliveryTag == 0) + { + synchronized(_unacknowledgedMessageMapLock) + { + //if have signalled to ack all, that refers only + //to all at this time + ackOp.update(_lastDeliveryTag, multiple); + } + } + else + { + ackOp.update(deliveryTag, multiple); } } else @@ -459,6 +463,17 @@ public class AMQChannel } } + private void checkAck(long deliveryTag) throws AMQException + { + synchronized(_unacknowledgedMessageMapLock) + { + if (!_unacknowledgedMessageMap.containsKey(deliveryTag)) + { + throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); + } + } + } + private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException { if (multiple) @@ -587,6 +602,16 @@ public class AMQChannel public void commit() throws AMQException { + if(ackOp != null) + { + ackOp.consolidate(); + if(ackOp.checkPersistent()) + { + _txnBuffer.containsPersistentChanges(); + } + ackOp = null;//already enlisted, after commit will reset regardless of outcome + } + _txnBuffer.commit(); //TODO: may need to return 'immediate' messages at this point } @@ -636,69 +661,22 @@ public class AMQChannel _returns.clear(); } - private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder() - { - return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L); - } - - - private class Ack implements TxnOp + //we use this wrapper to ensure we are always using the correct + //map instance (its not final unfortunately) + private class AckMap implements UnacknowledgedMessageMap { - private final Map<Long, UnacknowledgedMessage> _unacked; - - Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException + public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) { - _unacked = unacked; - - //if any of the messages in unacked are persistent the txn - //buffer must be marked as persistent: - for(UnacknowledgedMessage msg : _unacked.values()) - { - if(msg.message.isPersistent()) - { - _txnBuffer.containsPersistentChanges(); - break; - } - } + impl().collect(deliveryTag, multiple, msgs); } - - public void prepare() throws AMQException + public void remove(List<UnacknowledgedMessage> msgs) { - //make persistent changes, i.e. dequeue and decrementReference - for(UnacknowledgedMessage msg : _unacked.values()) - { - msg.discard(); - } - } - - public void undoPrepare() - { - //decrementReference is annoyingly untransactional (due to - //in memory counter) so if we failed in prepare for full - //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for(UnacknowledgedMessage msg : _unacked.values()) - { - - msg.message.incrementReference(); - } - } - - public void commit() - { - //remove the unacked messages from the channels map - synchronized(_unacknowledgedMessageMapLock) - { - for(long tag : _unacked.keySet()) - { - _unacknowledgedMessageMap.remove(tag); - } - } - + impl().remove(msgs); } - public void rollback() + private UnacknowledgedMessageMap impl() { + return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap); } } @@ -745,17 +723,6 @@ public class AMQChannel public void prepare() throws AMQException { - //the routers reference can now be released - _msg.decrementReference(); - try - { - _msg.checkDeliveredToConsumer(); - } - catch(NoConsumersException e) - { - //TODO: store this for delivery after the commit-ok - _returns.add(e.getReturnMessage(_channelId)); - } } public void undoPrepare() @@ -767,6 +734,27 @@ public class AMQChannel public void commit() { + //The routers reference can now be released. This is done + //here to ensure that it happens after the queues that + //enqueue it have incremented their counts (which as a + //memory only operation is done in the commit phase). + try + { + _msg.decrementReference(); + } + catch(AMQException e) + { + _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); + } + try + { + _msg.checkDeliveredToConsumer(); + } + catch(NoConsumersException e) + { + //TODO: store this for delivery after the commit-ok + _returns.add(e.getReturnMessage(_channelId)); + } } public void rollback() diff --git a/java/broker/src/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/org/apache/qpid/server/ack/TxAck.java new file mode 100644 index 0000000000..0d502a8f6e --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/ack/TxAck.java @@ -0,0 +1,129 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.ack; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.txn.TxnOp; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * A TxnOp implementation for handling accumulated acks + */ +public class TxAck implements TxnOp +{ + private final UnacknowledgedMessageMap _map; + private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>(); + private final List<Long> _individual = new LinkedList<Long>(); + private long _deliveryTag; + private boolean _multiple; + + public TxAck(UnacknowledgedMessageMap map) + { + _map = map; + } + + public void update(long deliveryTag, boolean multiple) + { + if(!multiple) + { + //have acked a single message that is not part of + //the previously acked region so record + //individually + _individual.add(deliveryTag);//_multiple && !multiple + } + else if(deliveryTag > _deliveryTag) + { + //have simply moved the last acked message on a + //bit + _deliveryTag = deliveryTag; + _multiple = true; + } + } + + public void consolidate() + { + //lookup all the unacked messages that have been acked in this transaction + if(_multiple) + { + //get all the unacked messages for the accumulated + //multiple acks + _map.collect(_deliveryTag, true, _unacked); + } + //get any unacked messages for individual acks outside the + //range covered by multiple acks + for(long tag : _individual) + { + if(_deliveryTag < tag) + { + _map.collect(tag, false, _unacked); + } + } + } + + public boolean checkPersistent() throws AMQException + { + //if any of the messages in unacked are persistent the txn + //buffer must be marked as persistent: + for(UnacknowledgedMessage msg : _unacked) + { + if(msg.message.isPersistent()) + { + return true; + } + } + return false; + } + + public void prepare() throws AMQException + { + //make persistent changes, i.e. dequeue and decrementReference + for(UnacknowledgedMessage msg : _unacked) + { + msg.discard(); + } + } + + public void undoPrepare() + { + //decrementReference is annoyingly untransactional (due to + //in memory counter) so if we failed in prepare for full + //txn, this op will have to compensate by fixing the count + //in memory (persistent changes will be rolled back by store) + for(UnacknowledgedMessage msg : _unacked) + { + msg.message.incrementReference(); + } + } + + public void commit() + { + //remove the unacked messages from the channels map + _map.remove(_unacked); + } + + public void rollback() + { + } +} + diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java new file mode 100644 index 0000000000..f13ecef7ac --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.ack; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; + +public class UnacknowledgedMessage +{ + public final AMQMessage message; + public final String consumerTag; + public final long deliveryTag; + public AMQQueue queue; + + public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag) + { + this.queue = queue; + this.message = message; + this.consumerTag = consumerTag; + this.deliveryTag = deliveryTag; + } + + public void discard() throws AMQException + { + if (queue != null) + { + message.dequeue(queue); + } + message.decrementReference(); + } +} + diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java new file mode 100644 index 0000000000..ccb55f7e2d --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -0,0 +1,27 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.ack; + +import java.util.List; + +public interface UnacknowledgedMessageMap +{ + public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); + public void remove(List<UnacknowledgedMessage> msgs); +} + diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java new file mode 100644 index 0000000000..117793930f --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -0,0 +1,81 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.ack; + +import java.util.List; +import java.util.Map; + +public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap +{ + private final Object _lock; + private Map<Long, UnacknowledgedMessage> _map; + + public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map) + { + _lock = lock; + _map = map; + } + + public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs) + { + if (multiple) + { + collect(deliveryTag, msgs); + } + else + { + msgs.add(get(deliveryTag)); + } + + } + + public void remove(List<UnacknowledgedMessage> msgs) + { + synchronized(_lock) + { + for(UnacknowledgedMessage msg : msgs) + { + _map.remove(msg.deliveryTag); + } + } + } + + private UnacknowledgedMessage get(long key) + { + synchronized(_lock) + { + return _map.get(key); + } + } + + private void collect(long key, List<UnacknowledgedMessage> msgs) + { + synchronized(_lock) + { + for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) + { + msgs.add(entry.getValue()); + if (entry.getKey() == key) + { + break; + } + } + } + } +} + diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java index 6573be782b..7666b5b3f8 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java @@ -255,7 +255,7 @@ public class AMQMessage * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. */ - public void decrementReference() throws AMQException + public void decrementReference() throws MessageCleanupException { // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after @@ -263,7 +263,16 @@ public class AMQMessage // not relying on the all the increments having taken place before the delivery manager decrements. if (_referenceCount.decrementAndGet() == 0) { - _store.removeMessage(_messageId); + try + { + _store.removeMessage(_messageId); + } + catch(AMQException e) + { + //to maintain consistency, we revert the count + incrementReference(); + throw new MessageCleanupException(_messageId, e); + } } } diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java index 6c795590f6..7fb6ddcb5c 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java @@ -707,8 +707,16 @@ public class AMQQueue implements Managable { try { - msg.decrementReference(); msg.dequeue(this); + msg.decrementReference(); + } + catch(MessageCleanupException e) + { + //Message was dequeued, but could notthen be deleted + //though it is no longer referenced. This should be very + //rare and can be detected and cleaned up on recovery or + //done through some form of manual intervention. + _logger.error(e, e); } catch(AMQException e) { @@ -777,7 +785,8 @@ public class AMQQueue implements Managable public void prepare() throws AMQException { - record(_msg); + //do the persistent part of the record() + _msg.enqueue(AMQQueue.this); } public void undoPrepare() @@ -786,6 +795,9 @@ public class AMQQueue implements Managable public void commit() { + //do the memeory part of the record() + _msg.incrementReference(); + //then process the message try { process(_msg); @@ -799,14 +811,6 @@ public class AMQQueue implements Managable public void rollback() { - try - { - _msg.decrementReference(); - } - catch (AMQException e) - { - _logger.error("Error rolling back a queue delivery: " + e, e); - } } } diff --git a/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java new file mode 100644 index 0000000000..6f0fa285b1 --- /dev/null +++ b/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; + +/** + * Signals that the removal of a message once its refcount reached + * zero failed. + */ +public class MessageCleanupException extends AMQException +{ + public MessageCleanupException(long messageId, AMQException e) + { + super("Failed to cleanup message with id " + messageId, e); + } +} diff --git a/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java b/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java deleted file mode 100644 index 2c24944317..0000000000 --- a/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.server.util; - -import java.util.List; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.NoSuchElementException; - -/** - * Utility class used by AMQChannel to retrieve unacknowledged - * messages. Made generic to avoid exposing the inner class in - * AMQChannel. Put in this package to keep ot out the way. - */ -public class OrderedMapHelper<K, V> -{ - private final Map<K, V> _map; - private final Object _lock; - private final K _wildcard; - - public OrderedMapHelper(Map<K, V> map, Object lock, K wildcard) - { - _map = map; - _lock = lock; - _wildcard = wildcard; - } - - /** - * Assumes the map passed in is ordered. Returns a copy of the - * map containing an individual key-value pair or a list of - * key-values upto and including the one matching the given - * key. If multiple == true and the key == the wildcard specified - * on construction, then all the values in the map will be - * returned. - */ - public Map<K, V> getValues(K key, boolean multiple) throws NoSuchElementException - { - if (multiple) - { - if(key == _wildcard) - { - synchronized(_lock) - { - return new LinkedHashMap<K, V>(_map); - } - } - else - { - return getValues(key); - } - } - else - { - Map<K, V> values = new LinkedHashMap<K, V>(); - values.put(key, getValue(key)); - return values; - } - } - - private V getValue(K key) throws NoSuchElementException - { - V value; - synchronized(_lock) - { - value = _map.get(key); - } - - if(value == null) - { - throw new NoSuchElementException(); - } - else - { - return value; - } - } - - private Map<K, V> getValues(K key) throws NoSuchElementException - { - Map<K, V> values = new LinkedHashMap<K, V>(); - synchronized(_lock) - { - if (!_map.containsKey(key)) - { - throw new NoSuchElementException(); - } - - for(Map.Entry<K, V> entry : _map.entrySet()) - { - values.put(entry.getKey(), entry.getValue()); - if (entry.getKey() == key) - { - break; - } - } - } - return values; - } - -}
\ No newline at end of file diff --git a/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java new file mode 100644 index 0000000000..4bf23ec25a --- /dev/null +++ b/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java @@ -0,0 +1,186 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.ack; + +import junit.framework.JUnit4TestAdapter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.Ignore; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.store.TestableMemoryMessageStore; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; + +public class TxAckTest +{ + private Scenario individual; + private Scenario multiple; + private Scenario combined; + + @Before + public void setup() throws Exception + { + //ack only 5th msg + individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); + individual.update(5, false); + + //ack all up to and including 5th msg + multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); + multiple.update(5, true); + + //leave only 8th and 9th unacked + combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); + combined.update(3, false); + combined.update(5, true); + combined.update(7, true); + combined.update(2, true);//should be ignored + combined.update(1, false);//should be ignored + combined.update(10, false); + } + + @Test + public void prepare() throws AMQException + { + individual.prepare(); + multiple.prepare(); + combined.prepare(); + } + + @Test + public void undoPrepare() throws AMQException + { + individual.undoPrepare(); + multiple.undoPrepare(); + combined.undoPrepare(); + } + + @Test + public void commit() throws AMQException + { + individual.commit(); + multiple.commit(); + combined.commit(); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(TxAckTest.class); + } + + private class Scenario + { + private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>(); + private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages); + private final TxAck _op = new TxAck(_map); + private final List<Long> _acked; + private final List<Long> _unacked; + + Scenario(int messageCount, List<Long> acked, List<Long> unacked) + { + for(int i = 0; i < messageCount; i++) + { + long deliveryTag = i + 1; + _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag)); + } + _acked = acked; + _unacked = unacked; + } + + void update(long deliverytag, boolean multiple) + { + _op.update(deliverytag, multiple); + } + + private void assertCount(List<Long> tags, int expected) + { + for(long tag : tags) + { + UnacknowledgedMessage u = _messages.get(tag); + assertTrue("Message not found for tag " + tag, u != null); + ((TestMessage) u.message).assertCountEquals(expected); + } + } + + void prepare() throws AMQException + { + _op.consolidate(); + _op.prepare(); + + assertCount(_acked, -1); + assertCount(_unacked, 0); + + } + void undoPrepare() + { + _op.consolidate(); + _op.undoPrepare(); + + assertCount(_acked, 1); + assertCount(_unacked, 0); + } + + void commit() + { + _op.consolidate(); + _op.commit(); + + + //check acked messages are removed from map + HashSet<Long> keys = new HashSet<Long>(_messages.keySet()); + keys.retainAll(_acked); + assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); + //check unacked messages are still in map + keys = new HashSet<Long>(_unacked); + keys.removeAll(_messages.keySet()); + assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); + } + } + + private class TestMessage extends AMQMessage + { + private final long _tag; + private int _count; + + TestMessage(long tag) + { + super(new TestableMemoryMessageStore(), null); + _tag = tag; + } + + public void incrementReference() + { + _count++; + } + + public void decrementReference() + { + _count--; + } + + void assertCountEquals(int expected) + { + assertEquals("Wrong count for message with tag " + _tag, expected, _count); + } + } +} diff --git a/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java new file mode 100644 index 0000000000..3d7db01365 --- /dev/null +++ b/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.ack; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TxAckTest.class +}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java index 8ce006b3ae..f22ccb8caf 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.TestApplicationRegistry; @@ -93,15 +94,15 @@ public class AckTest final int msgCount = 10; publishMessages(msgCount); - Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); - Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator(); + Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); for (int i = 1; i <= map.size(); i++) { - Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next(); + Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); assertTrue(entry.getKey() == i); - AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue(); + UnacknowledgedMessage unackedMsg = entry.getValue(); assertTrue(unackedMsg.queue == _queue); } assertTrue(_messageStore.getMessageMap().size() == msgCount); @@ -118,7 +119,7 @@ public class AckTest final int msgCount = 10; publishMessages(msgCount); - Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageMap().size() == 0); } @@ -135,16 +136,16 @@ public class AckTest publishMessages(msgCount); _channel.acknowledgeMessage(5, false); - Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount - 1); - Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator(); + Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); int i = 1; while (i <= map.size()) { - Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next(); + Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); assertTrue(entry.getKey() == i); - AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue(); + UnacknowledgedMessage unackedMsg = entry.getValue(); assertTrue(unackedMsg.queue == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) @@ -166,16 +167,16 @@ public class AckTest publishMessages(msgCount); _channel.acknowledgeMessage(5, true); - Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); - Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator(); + Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); int i = 1; while (i <= map.size()) { - Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next(); + Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); assertTrue(entry.getKey() == i + 5); - AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue(); + UnacknowledgedMessage unackedMsg = entry.getValue(); assertTrue(unackedMsg.queue == _queue); ++i; } @@ -193,16 +194,16 @@ public class AckTest publishMessages(msgCount); _channel.acknowledgeMessage(0, true); - Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator(); + Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); int i = 1; while (i <= map.size()) { - Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next(); + Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); assertTrue(entry.getKey() == i + 5); - AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue(); + UnacknowledgedMessage unackedMsg = entry.getValue(); assertTrue(unackedMsg.queue == _queue); ++i; } @@ -221,7 +222,7 @@ public class AckTest // at this point we should have sent out only 5 messages with a further 5 queued // up in the channel which should be suspended assertTrue(_subscription.isSuspended()); - Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); _channel.acknowledgeMessage(5, true); assertTrue(!_subscription.isSuspended()); diff --git a/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java b/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java deleted file mode 100644 index 74340e98c0..0000000000 --- a/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.server.util; - -import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; -import org.junit.Ignore; -import org.apache.qpid.AMQException; - -import java.util.LinkedHashMap; -import java.util.Map; - -public class OrderedMapHelperTest -{ - private final Object lock = new Object(); - private final Map<Integer, String> map = new LinkedHashMap<Integer, String>(); - private final OrderedMapHelper<Integer, String> helper = new OrderedMapHelper<Integer, String>(map, lock, 0); - - @Before - public void setup() throws Exception - { - map.put(1, "One"); - map.put(2, "Two"); - map.put(5, "Five"); - map.put(8, "Eight"); - map.put(10, "Ten"); - } - - @Test - public void specific() - { - Map<Integer, String> slice = helper.getValues(5, false); - assertEquals(1, slice.size()); - assertTrue(slice.containsKey(5)); - assertTrue(slice.containsValue("Five")); - assertEquals("Five", slice.get(5)); - } - - @Test - public void multiple() - { - Map<Integer, String> slice = helper.getValues(5, true); - assertEquals(3, slice.size()); - - assertTrue(slice.containsKey(1)); - assertTrue(slice.containsKey(2)); - assertTrue(slice.containsKey(5)); - - assertEquals("One", slice.get(1)); - assertEquals("Two", slice.get(2)); - assertEquals("Five", slice.get(5)); - } - - @Test - public void all() - { - Map<Integer, String> slice = helper.getValues(0/*the 'wildcard'*/, true); - assertEquals(5, slice.size()); - - assertTrue(slice.containsKey(1)); - assertTrue(slice.containsKey(2)); - assertTrue(slice.containsKey(5)); - assertTrue(slice.containsKey(8)); - assertTrue(slice.containsKey(10)); - - assertEquals("One", slice.get(1)); - assertEquals("Two", slice.get(2)); - assertEquals("Five", slice.get(5)); - assertEquals("Eight", slice.get(8)); - assertEquals("Ten", slice.get(10)); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(OrderedMapHelperTest.class); - } -} diff --git a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java index 5c2242fb0d..d6cc471413 100644 --- a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java +++ b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java @@ -22,7 +22,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Suite; @RunWith(Suite.class) -@Suite.SuiteClasses({LoggingProxyTest.class, OrderedMapHelperTest.class}) +@Suite.SuiteClasses({LoggingProxyTest.class}) public class UnitTests { public static junit.framework.Test suite() diff --git a/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java index 5b8de7a13f..257a9374e8 100644 --- a/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java +++ b/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java @@ -18,12 +18,18 @@ package org.apache.qpid.client; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.url.BindingURL; /** * A destination backed by a headers exchange */ public class AMQHeadersExchange extends AMQDestination { + public AMQHeadersExchange(BindingURL binding) + { + this(binding.getExchangeName()); + } + public AMQHeadersExchange(String queueName) { super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 54ffc979af..8b2c2ec04d 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -35,12 +35,12 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.text.MessageFormat; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -720,18 +720,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { - return createConsumer(destination, _defaultPrefetchHighMark, false, false, null); + return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector); + return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { - return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector); + return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, @@ -740,7 +740,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - return createConsumer(destination, prefetch, noLocal, exclusive, selector, null); + return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + } + + + public MessageConsumer createConsumer(Destination destination, + int prefetchHigh, + int prefetchLow, + boolean noLocal, + boolean exclusive, + String selector) throws JMSException + { + return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } public MessageConsumer createConsumer(Destination destination, @@ -750,12 +761,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - return createConsumerImpl(destination, prefetch, noLocal, exclusive, + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, + selector, rawSelector); + } + + public MessageConsumer createConsumer(Destination destination, + int prefetchHigh, + int prefetchLow, + boolean noLocal, + boolean exclusive, + String selector, + FieldTable rawSelector) throws JMSException + { + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } protected MessageConsumer createConsumerImpl(final Destination destination, - final int prefetch, + final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, final String selector, @@ -780,7 +804,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, - protocolHandler, ft, prefetch, exclusive, + protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode); try @@ -862,9 +886,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName * @return the consumer tag generated by the broker */ - private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetch, + private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException { + //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); @@ -1118,8 +1143,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(), - consumer.isExclusive(), consumer.getAcknowledgeMode()); + String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), + consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); consumer.setConsumerTag(consumerTag); _consumers.put(consumerTag, consumer); diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index b46c5f111d..a6f89fd221 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -91,9 +91,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private FieldTable _rawSelectorFieldTable; /** - * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover + * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover */ - private int _prefetch; + private int _prefetchHigh; + + /** + * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + */ + private int _prefetchLow; /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover @@ -118,10 +123,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private long _lastDeliveryTag; + /** + * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. + * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + */ + private boolean _dups_ok_acknowledge_send; + BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetch, - boolean exclusive, int acknowledgeMode) + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, + int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) { _channelId = channelId; _connection = connection; @@ -132,7 +143,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session = session; _protocolHandler = protocolHandler; _rawSelectorFieldTable = rawSelectorFieldTable; - _prefetch = prefetch; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; } @@ -232,7 +244,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public int getPrefetch() { - return _prefetch; + return _prefetchHigh; + } + + public int getPrefetchHigh() + { + return _prefetchHigh; + } + + public int getPrefetchLow() + { + return _prefetchLow; } public boolean isNoLocal() @@ -309,10 +331,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * We can get back either a Message or an exception from the queue. This method examines the argument and deals * with it by throwing it (if an exception) or returning it (in any other case). + * * @param o * @return a message only if o is a Message * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not - * a JMSException is created with the linked exception set appropriately + * a JMSException is created with the linked exception set appropriately */ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException @@ -335,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { @@ -370,8 +393,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case * of a message listener or a synchronous receive() caller. + * * @param messageFrame the raw unprocessed mesage - * @param channelId channel on which this message was sent + * @param channelId channel on which this message was sent */ void notifyMessage(UnprocessedMessage messageFrame, int channelId) { @@ -435,7 +459,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetch) + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) { _session.acknowledgeMessage(msg.getDeliveryTag(), true); } diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index b181490fdd..acb60f63ae 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -47,6 +47,9 @@ import java.util.concurrent.ConcurrentMap; */ public class AMQProtocolSession implements ProtocolVersionList { + + private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; + private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; @@ -59,6 +62,8 @@ public class AMQProtocolSession implements ProtocolVersionList private final IoSession _minaProtocolSession; + private WriteFuture _lastWriteFuture; + /** * The handler from which this session was created and which is used to handle protocol events. * We send failover events to the handler. @@ -255,7 +260,7 @@ public class AMQProtocolSession implements ProtocolVersionList */ public void writeFrame(AMQDataBlock frame) { - _minaProtocolSession.write(frame); + writeFrame(frame, false); } public void writeFrame(AMQDataBlock frame, boolean wait) @@ -265,6 +270,10 @@ public class AMQProtocolSession implements ProtocolVersionList { f.join(); } + else + { + _lastWriteFuture = f; + } } public void addSessionByChannel(int channelId, AMQSession session) @@ -342,6 +351,12 @@ public class AMQProtocolSession implements ProtocolVersionList public void closeProtocolSession() { + _logger.debug("Waiting for last write to join."); + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + } + _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); future.join(); diff --git a/java/client/src/org/apache/qpid/jms/Session.java b/java/client/src/org/apache/qpid/jms/Session.java index d369c08aa1..1440ace2b6 100644 --- a/java/client/src/org/apache/qpid/jms/Session.java +++ b/java/client/src/org/apache/qpid/jms/Session.java @@ -42,6 +42,13 @@ public interface Session extends javax.jms.Session boolean exclusive, String selector) throws JMSException; + MessageConsumer createConsumer(Destination destination, + int prefetchHigh, + int prefetchLow, + boolean noLocal, + boolean exclusive, + String selector) throws JMSException; + /** * @return the prefetch value used by default for consumers created on this session. */ diff --git a/java/client/src/org/apache/qpid/jndi/Example.properties b/java/client/src/org/apache/qpid/jndi/Example.properties new file mode 100644 index 0000000000..ae2859f8e1 --- /dev/null +++ b/java/client/src/org/apache/qpid/jndi/Example.properties @@ -0,0 +1,21 @@ +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialConextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue diff --git a/java/client/src/org/apache/qpid/jndi/NameParserImpl.java b/java/client/src/org/apache/qpid/jndi/NameParserImpl.java new file mode 100644 index 0000000000..a3174aec7a --- /dev/null +++ b/java/client/src/org/apache/qpid/jndi/NameParserImpl.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.jndi; + +import javax.naming.CompositeName; +import javax.naming.Name; +import javax.naming.NameParser; +import javax.naming.NamingException; + +/** + * A default implementation of {@link NameParser} + * <p/> + * Based on class from ActiveMQ. + */ +public class NameParserImpl implements NameParser +{ + public Name parse(String name) throws NamingException + { + return new CompositeName(name); + } +} diff --git a/java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java new file mode 100644 index 0000000000..7691a607ba --- /dev/null +++ b/java/client/src/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -0,0 +1,289 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.jndi; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQHeadersExchange; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class PropertiesFileInitialContextFactory implements InitialContextFactory +{ + protected final Logger _logger = Logger.getLogger(getClass()); + + private String CONNECTION_FACTORY_PREFIX = "connectionfactory."; + private String DESTINATION_PREFIX = "destination."; + private String QUEUE_PREFIX = "queue."; + private String TOPIC_PREFIX = "topic."; + + public Context getInitialContext(Hashtable environment) throws NamingException + { + Map data = new ConcurrentHashMap(); + + createConnectionFactories(data, environment); + + createDestinations(data, environment); + + createQueues(data, environment); + + createTopics(data, environment); + + return createContext(data, environment); + } + + // Implementation methods + //------------------------------------------------------------------------- + protected ReadOnlyContext createContext(Map data, Hashtable environment) + { + return new ReadOnlyContext(environment, data); + } + + protected void createConnectionFactories(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(CONNECTION_FACTORY_PREFIX)) + { + String jndiName = key.substring(CONNECTION_FACTORY_PREFIX.length()); + ConnectionFactory cf = createFactory(entry.getValue().toString()); + if (cf != null) + { + data.put(jndiName, cf); + } + } + } + } + + protected void createDestinations(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(DESTINATION_PREFIX)) + { + String jndiName = key.substring(DESTINATION_PREFIX.length()); + Destination dest = createDestination(entry.getValue().toString()); + if (dest != null) + { + data.put(jndiName, dest); + } + } + } + } + + protected void createQueues(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(QUEUE_PREFIX)) + { + String jndiName = key.substring(QUEUE_PREFIX.length()); + Queue q = createQueue(entry.getValue().toString()); + if (q != null) + { + data.put(jndiName, q); + } + } + } + } + + protected void createTopics(Map data, Hashtable environment) + { + for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) + { + Map.Entry entry = (Map.Entry) iter.next(); + String key = entry.getKey().toString(); + if (key.startsWith(TOPIC_PREFIX)) + { + String jndiName = key.substring(TOPIC_PREFIX.length()); + Topic t = createTopic(entry.getValue().toString()); + if (t != null) + { + data.put(jndiName, t); + } + } + } + } + + /** + * Factory method to create new Connection Factory instances + */ + protected ConnectionFactory createFactory(String url) + { + try + { + return new AMQConnectionFactory(url); + } + catch (URLSyntaxException urlse) + { + _logger.warn("Unable to createFactories:" + urlse); + } + return null; + } + + /** + * Factory method to create new Destination instances from an AMQP BindingURL + */ + protected Destination createDestination(String bindingURL) + { + AMQBindingURL binding; + try + { + binding = new AMQBindingURL(bindingURL); + } + catch (URLSyntaxException urlse) + { + _logger.warn("Unable to destination:" + urlse); + return null; + } + + if (binding.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + return createTopic(binding); + } + else if (binding.getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) + { + return createQueue(binding); + } + else if (binding.getExchangeClass().equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS)) + { + return createHeaderExchange(binding); + } + + _logger.warn("Binding: '" + binding + "' not supported"); + return null; + } + + /** + * Factory method to create new Queue instances + */ + protected Queue createQueue(Object value) + { + if (value instanceof String) + + { + return new AMQQueue((String) value); + } + else if (value instanceof BindingURL) + + { + return new AMQQueue((BindingURL) value); + } + + return null; + } + + /** + * Factory method to create new Topic instances + */ + protected Topic createTopic(Object value) + { + if (value instanceof String) + { + return new AMQTopic((String) value); + } + else if (value instanceof BindingURL) + + { + return new AMQTopic((BindingURL) value); + } + + return null; + } + + /** + * Factory method to create new HeaderExcahnge instances + */ + protected Destination createHeaderExchange(Object value) + { + if (value instanceof String) + { + return new AMQHeadersExchange((String) value); + } + else if (value instanceof BindingURL) + { + return new AMQHeadersExchange((BindingURL) value); + } + + return null; + } + + // Properties + //------------------------------------------------------------------------- + public String getConnectionPrefix() + { + return CONNECTION_FACTORY_PREFIX; + } + + public void setConnectionPrefix(String connectionPrefix) + { + this.CONNECTION_FACTORY_PREFIX = connectionPrefix; + } + + public String getDestinationPrefix() + { + return DESTINATION_PREFIX; + } + + public void setDestinationPrefix(String destinationPrefix) + { + this.DESTINATION_PREFIX = destinationPrefix; + } + + public String getQueuePrefix() + { + return QUEUE_PREFIX; + } + + public void setQueuePrefix(String queuePrefix) + { + this.QUEUE_PREFIX = queuePrefix; + } + + public String getTopicPrefix() + { + return TOPIC_PREFIX; + } + + public void setTopicPrefix(String topicPrefix) + { + this.TOPIC_PREFIX = topicPrefix; + } +} diff --git a/java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java b/java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java new file mode 100644 index 0000000000..b721aefc87 --- /dev/null +++ b/java/client/src/org/apache/qpid/jndi/ReadOnlyContext.java @@ -0,0 +1,497 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.jndi; + +import javax.naming.*; +import javax.naming.spi.NamingManager; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; + +/** + * Based on class from ActiveMQ. + * A read-only Context + * <p/> + * This version assumes it and all its subcontext are read-only and any attempt + * to modify (e.g. through bind) will result in an OperationNotSupportedException. + * Each Context in the tree builds a cache of the entries in all sub-contexts + * to optimise the performance of lookup. + * </p> + * <p>This implementation is intended to optimise the performance of lookup(String) + * to about the level of a HashMap get. It has been observed that the scheme + * resolution phase performed by the JVM takes considerably longer, so for + * optimum performance lookups should be coded like:</p> + * <code> + * Context componentContext = (Context)new InitialContext().lookup("java:comp"); + * String envEntry = (String) componentContext.lookup("env/myEntry"); + * String envEntry2 = (String) componentContext.lookup("env/myEntry2"); + * </code> + * + * @version $Revision: 1.2 $ $Date: 2005/08/27 03:52:39 $ + */ +public class ReadOnlyContext implements Context, Serializable +{ + private static final long serialVersionUID = -5754338187296859149L; + protected static final NameParser nameParser = new NameParserImpl(); + + protected final Hashtable environment; // environment for this context + protected final Map bindings; // bindings at my level + protected final Map treeBindings; // all bindings under me + + private boolean frozen = false; + private String nameInNamespace = ""; + public static final String SEPARATOR = "/"; + + public ReadOnlyContext() + { + environment = new Hashtable(); + bindings = new HashMap(); + treeBindings = new HashMap(); + } + + public ReadOnlyContext(Hashtable env) + { + if (env == null) + { + this.environment = new Hashtable(); + } + else + { + this.environment = new Hashtable(env); + } + this.bindings = Collections.EMPTY_MAP; + this.treeBindings = Collections.EMPTY_MAP; + } + + public ReadOnlyContext(Hashtable environment, Map bindings) + { + if (environment == null) + { + this.environment = new Hashtable(); + } + else + { + this.environment = new Hashtable(environment); + } + this.bindings = bindings; + treeBindings = new HashMap(); + frozen = true; + } + + public ReadOnlyContext(Hashtable environment, Map bindings, String nameInNamespace) + { + this(environment, bindings); + this.nameInNamespace = nameInNamespace; + } + + protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env) + { + this.bindings = clone.bindings; + this.treeBindings = clone.treeBindings; + this.environment = new Hashtable(env); + } + + protected ReadOnlyContext(ReadOnlyContext clone, Hashtable env, String nameInNamespace) + { + this(clone, env); + this.nameInNamespace = nameInNamespace; + } + + public void freeze() + { + frozen = true; + } + + boolean isFrozen() + { + return frozen; + } + + /** + * internalBind is intended for use only during setup or possibly by suitably synchronized superclasses. + * It binds every possible lookup into a map in each context. To do this, each context + * strips off one name segment and if necessary creates a new context for it. Then it asks that context + * to bind the remaining name. It returns a map containing all the bindings from the next context, plus + * the context it just created (if it in fact created it). (the names are suitably extended by the segment + * originally lopped off). + * + * @param name + * @param value + * @return + * @throws javax.naming.NamingException + */ + protected Map internalBind(String name, Object value) throws NamingException + { + assert name != null && name.length() > 0; + assert!frozen; + + Map newBindings = new HashMap(); + int pos = name.indexOf('/'); + if (pos == -1) + { + if (treeBindings.put(name, value) != null) + { + throw new NamingException("Something already bound at " + name); + } + bindings.put(name, value); + newBindings.put(name, value); + } + else + { + String segment = name.substring(0, pos); + assert segment != null; + assert!segment.equals(""); + Object o = treeBindings.get(segment); + if (o == null) + { + o = newContext(); + treeBindings.put(segment, o); + bindings.put(segment, o); + newBindings.put(segment, o); + } + else if (!(o instanceof ReadOnlyContext)) + { + throw new NamingException("Something already bound where a subcontext should go"); + } + ReadOnlyContext readOnlyContext = (ReadOnlyContext) o; + String remainder = name.substring(pos + 1); + Map subBindings = readOnlyContext.internalBind(remainder, value); + for (Iterator iterator = subBindings.entrySet().iterator(); iterator.hasNext();) + { + Map.Entry entry = (Map.Entry) iterator.next(); + String subName = segment + "/" + (String) entry.getKey(); + Object bound = entry.getValue(); + treeBindings.put(subName, bound); + newBindings.put(subName, bound); + } + } + return newBindings; + } + + protected ReadOnlyContext newContext() + { + return new ReadOnlyContext(); + } + + public Object addToEnvironment(String propName, Object propVal) throws NamingException + { + return environment.put(propName, propVal); + } + + public Hashtable getEnvironment() throws NamingException + { + return (Hashtable) environment.clone(); + } + + public Object removeFromEnvironment(String propName) throws NamingException + { + return environment.remove(propName); + } + + public Object lookup(String name) throws NamingException + { + if (name.length() == 0) + { + return this; + } + Object result = treeBindings.get(name); + if (result == null) + { + result = bindings.get(name); + } + if (result == null) + { + int pos = name.indexOf(':'); + if (pos > 0) + { + String scheme = name.substring(0, pos); + Context ctx = NamingManager.getURLContext(scheme, environment); + if (ctx == null) + { + throw new NamingException("scheme " + scheme + " not recognized"); + } + return ctx.lookup(name); + } + else + { + // Split out the first name of the path + // and look for it in the bindings map. + CompositeName path = new CompositeName(name); + + if (path.size() == 0) + { + return this; + } + else + { + String first = path.get(0); + Object obj = bindings.get(first); + if (obj == null) + { + throw new NameNotFoundException(name); + } + else if (obj instanceof Context && path.size() > 1) + { + Context subContext = (Context) obj; + obj = subContext.lookup(path.getSuffix(1)); + } + return obj; + } + } + } + if (result instanceof LinkRef) + { + LinkRef ref = (LinkRef) result; + result = lookup(ref.getLinkName()); + } + if (result instanceof Reference) + { + try + { + result = NamingManager.getObjectInstance(result, null, null, this.environment); + } + catch (NamingException e) + { + throw e; + } + catch (Exception e) + { + throw(NamingException) new NamingException("could not look up : " + name).initCause(e); + } + } + if (result instanceof ReadOnlyContext) + { + String prefix = getNameInNamespace(); + if (prefix.length() > 0) + { + prefix = prefix + SEPARATOR; + } + result = new ReadOnlyContext((ReadOnlyContext) result, environment, prefix + name); + } + return result; + } + + public Object lookup(Name name) throws NamingException + { + return lookup(name.toString()); + } + + public Object lookupLink(String name) throws NamingException + { + return lookup(name); + } + + public Name composeName(Name name, Name prefix) throws NamingException + { + Name result = (Name) prefix.clone(); + result.addAll(name); + return result; + } + + public String composeName(String name, String prefix) throws NamingException + { + CompositeName result = new CompositeName(prefix); + result.addAll(new CompositeName(name)); + return result.toString(); + } + + public NamingEnumeration list(String name) throws NamingException + { + Object o = lookup(name); + if (o == this) + { + return new ListEnumeration(); + } + else if (o instanceof Context) + { + return ((Context) o).list(""); + } + else + { + throw new NotContextException(); + } + } + + public NamingEnumeration listBindings(String name) throws NamingException + { + Object o = lookup(name); + if (o == this) + { + return new ListBindingEnumeration(); + } + else if (o instanceof Context) + { + return ((Context) o).listBindings(""); + } + else + { + throw new NotContextException(); + } + } + + public Object lookupLink(Name name) throws NamingException + { + return lookupLink(name.toString()); + } + + public NamingEnumeration list(Name name) throws NamingException + { + return list(name.toString()); + } + + public NamingEnumeration listBindings(Name name) throws NamingException + { + return listBindings(name.toString()); + } + + public void bind(Name name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void bind(String name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void close() throws NamingException + { + // ignore + } + + public Context createSubcontext(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public Context createSubcontext(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void destroySubcontext(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void destroySubcontext(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public String getNameInNamespace() throws NamingException + { + return nameInNamespace; + } + + public NameParser getNameParser(Name name) throws NamingException + { + return nameParser; + } + + public NameParser getNameParser(String name) throws NamingException + { + return nameParser; + } + + public void rebind(Name name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rebind(String name, Object obj) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rename(Name oldName, Name newName) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void rename(String oldName, String newName) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void unbind(Name name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + public void unbind(String name) throws NamingException + { + throw new OperationNotSupportedException(); + } + + private abstract class LocalNamingEnumeration implements NamingEnumeration + { + private Iterator i = bindings.entrySet().iterator(); + + public boolean hasMore() throws NamingException + { + return i.hasNext(); + } + + public boolean hasMoreElements() + { + return i.hasNext(); + } + + protected Map.Entry getNext() + { + return (Map.Entry) i.next(); + } + + public void close() throws NamingException + { + } + } + + private class ListEnumeration extends LocalNamingEnumeration + { + public Object next() throws NamingException + { + return nextElement(); + } + + public Object nextElement() + { + Map.Entry entry = getNext(); + return new NameClassPair((String) entry.getKey(), entry.getValue().getClass().getName()); + } + } + + private class ListBindingEnumeration extends LocalNamingEnumeration + { + public Object next() throws NamingException + { + return nextElement(); + } + + public Object nextElement() + { + Map.Entry entry = getNext(); + return new Binding((String) entry.getKey(), entry.getValue()); + } + } +} diff --git a/java/client/test/src/org/apache/qpid/headers/MessageFactory.java b/java/client/test/src/org/apache/qpid/headers/MessageFactory.java index fcbfe8abf2..fc976766e4 100644 --- a/java/client/test/src/org/apache/qpid/headers/MessageFactory.java +++ b/java/client/test/src/org/apache/qpid/headers/MessageFactory.java @@ -17,10 +17,15 @@ */ package org.apache.qpid.headers; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.FieldTable; -import javax.jms.*; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; /** */ @@ -46,7 +51,7 @@ class MessageFactory } _session = session; _payload = new byte[payloadSize]; - for(int i = 0; i < _payload.length; i++) + for (int i = 0; i < _payload.length; i++) { _payload[i] = (byte) DATA[i % DATA.length]; } @@ -156,7 +161,7 @@ class MessageFactory private static Message setHeaders(Message m, String[] headers) throws JMSException { - for(int i = 0; i < headers.length; i++) + for (int i = 0; i < headers.length; i++) { // the value in GRM is 5 bytes m.setStringProperty(headers[i], "value"); diff --git a/java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java b/java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java new file mode 100644 index 0000000000..27a92d334a --- /dev/null +++ b/java/client/test/src/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java @@ -0,0 +1,110 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.test.unit.jndi; + +import junit.framework.JUnit4TestAdapter; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; +import java.util.Properties; + +public class PropertiesFileInitialContextFactoryTest +{ + InitialContextFactory contextFactory; + Properties _properties; + + @Before + public void setupProperties() + { + _properties = new Properties(); + _properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); + _properties.put("connectionfactory.local", "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'"); + _properties.put("queue.MyQueue", "example.MyQueue"); + _properties.put("topic.ibmStocks", "stocks.nyse.ibm"); + _properties.put("destination.direct", "direct://amq.direct//directQueue"); + } + + @Test + public void test() + { + Context ctx = null; + try + { + ctx = new InitialContext(_properties); + } + catch (NamingException ne) + { + Assert.fail("Error loading context:" + ne); + } + + try + { + AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); + Assert.assertEquals("amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'", cf.getConnectionURL().toString()); + } + catch (NamingException ne) + { + Assert.fail("Unable to create Connection Factory:" + ne); + } + + try + { + AMQQueue queue = (AMQQueue) ctx.lookup("MyQueue"); + Assert.assertEquals("example.MyQueue", queue.getRoutingKey()); + } + catch (NamingException ne) + { + Assert.fail("Unable to create queue:" + ne); + } + + try + { + AMQTopic topic = (AMQTopic) ctx.lookup("ibmStocks"); + Assert.assertEquals("stocks.nyse.ibm", topic.getTopicName()); + } + catch (Exception ne) + { + Assert.fail("Unable to create topic:" + ne); + } + + try + { + AMQQueue direct = (AMQQueue) ctx.lookup("direct"); + Assert.assertEquals("directQueue", direct.getRoutingKey()); + } + catch (NamingException ne) + { + Assert.fail("Unable to create direct destination:" + ne); + } + + + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(PropertiesFileInitialContextFactoryTest.class); + } +} diff --git a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java index 86744d162f..200c89bebe 100644 --- a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java +++ b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java @@ -120,6 +120,7 @@ public class TransactedTest expect("Z", testConsumer2.receive(1000)); assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); } @Test @@ -140,6 +141,7 @@ public class TransactedTest expect("B", consumer.receive(1000)); expect("C", consumer.receive(1000)); + assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); } diff --git a/java/common/bin/qpid-run b/java/common/bin/qpid-run index a45b5a9692..7db32acebb 100644 --- a/java/common/bin/qpid-run +++ b/java/common/bin/qpid-run @@ -15,6 +15,12 @@ # limitations under the License. # +# Test if we're running on cygwin. +cygwin=false +if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then + cygwin=true +fi + die() { if [[ $1 = -usage ]]; then shift @@ -41,6 +47,42 @@ if [ -z "$QPID_WORK" ]; then QPID_WORK=$HOME fi +if $cygwin; then + QPID_HOME=$(cygpath -w $QPID_HOME) + QPID_WORK=$(cygpath -w $QPID_WORK) +fi + +#Set the default system properties that we'll use now that they have +#all been initialised +SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK" + +#If logprefix or logsuffix set to use PID make that happen +#Otherwise just pass the value through for these props +#Using X character to avoid probs with empty strings +if [ -n "$QPID_LOG_PREFIX" ]; then + if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then + echo Using pid in qpid log name prefix + LOG_PREFIX=" -Dlogprefix=$$" + else + echo Using qpid logprefix property + LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX" + fi + SYSTEM_PROPS+=$LOG_PREFIX +fi + +if [ -n "$QPID_LOG_SUFFIX" ]; then + if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then + echo Using pid in qpid log name suffix + LOG_SUFFIX=" -Dlogsuffix=$$" + else + echo Using qpig logsuffix property + LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX" + fi + SYSTEM_PROPS+=$LOG_SUFFIX +fi + +echo System Properties set to $SYSTEM_PROPS + program=$(basename $0) sourced=${BASH_SOURCE[0]} if [[ -z ${sourced:-''} ]]; then @@ -58,11 +100,6 @@ usage() { sed "s/^--$//" } -cygwin=false -if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then - cygwin=true -fi - export EXTERNAL_CLASSPATH=$CLASSPATH unset CLASSPATH @@ -175,12 +212,10 @@ for arg in "${RUN_ARGS[@]}"; do done if $cygwin; then - QPID_HOME=$(cygpath -w $QPID_HOME) CLASSPATH=$(cygpath -w -p $CLASSPATH) - QPID_WORK=$(cygpath -w $QPID_WORK) JAVA=$(cygpath -u $JAVA) fi -COMMAND=($JAVA $JAVA_VM $JAVA_MEM -Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}") +COMMAND=($JAVA $JAVA_VM $JAVA_MEM $SYSTEM_PROPS $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}") DISPATCH diff --git a/java/common/src/org/apache/qpid/framing/ContentBody.java b/java/common/src/org/apache/qpid/framing/ContentBody.java index a345d1d225..d7b668534c 100644 --- a/java/common/src/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/org/apache/qpid/framing/ContentBody.java @@ -32,7 +32,7 @@ public class ContentBody extends AMQBody public int getSize() { - return (payload == null?0:payload.limit()); + return (payload == null ? 0 : payload.limit()); } public void writePayload(ByteBuffer buffer) @@ -49,8 +49,27 @@ public class ContentBody extends AMQBody if (size > 0) { payload = buffer.slice(); - payload.limit((int)size); - buffer.skip((int)size); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + public void reduceBufferToFit() + { + if (payload != null && (payload.remaining() < payload.capacity() / 2)) + { + int size = payload.limit(); + ByteBuffer newPayload = ByteBuffer.allocate(size); + + newPayload.put(payload); + newPayload.position(0); + newPayload.limit(size); + + //reduce reference count on payload + payload.release(); + + payload = newPayload; } } |