summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-19 10:11:47 +0000
committerGordon Sim <gsim@apache.org>2006-10-19 10:11:47 +0000
commit0c7aaaaab89dff7f0843cbcdf40791e4d17255bb (patch)
tree6eecd25a9ba32c11fdffe69f7508b222e486814a
parent025451b1e26c48ca58c388921827929d11c9459c (diff)
downloadqpid-python-0c7aaaaab89dff7f0843cbcdf40791e4d17255bb.tar.gz
Further fixes and some extra tests for transactions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@465549 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java180
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/TxAck.java129
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java48
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java27
-rw-r--r--java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java81
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQMessage.java13
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQQueue.java24
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java32
-rw-r--r--java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java116
-rw-r--r--java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java186
-rw-r--r--java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java34
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/AckTest.java37
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java95
-rw-r--r--java/broker/test/src/org/apache/qpid/server/util/UnitTests.java2
-rw-r--r--java/client/test/src/org/apache/qpid/transacted/TransactedTest.java2
15 files changed, 668 insertions, 338 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 78889e69b5..055af16159 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -23,6 +23,10 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
@@ -31,7 +35,6 @@ import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
-import org.apache.qpid.server.util.OrderedMapHelper;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@@ -93,36 +96,17 @@ public class AMQChannel
private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+ private long _lastDeliveryTag;
+
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private final MessageRouter _exchanges;
private final TxnBuffer _txnBuffer;
- private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
-
- public static class UnacknowledgedMessage
- {
- public final AMQMessage message;
- public final String consumerTag;
- public AMQQueue queue;
+ private TxAck ackOp;
- public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag)
- {
- this.queue = queue;
- this.message = message;
- this.consumerTag = consumerTag;
- }
-
- private void discard() throws AMQException
- {
- if (queue != null)
- {
- message.dequeue(queue);
- }
- message.decrementReference();
- }
- }
+ private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
@@ -352,7 +336,8 @@ public class AMQChannel
{
synchronized(_unacknowledgedMessageMapLock)
{
- _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
+ _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ _lastDeliveryTag = deliveryTag;
checkSuspension();
}
}
@@ -444,13 +429,32 @@ public class AMQChannel
{
if (_transactional)
{
- try
+ //check that the tag exists to give early failure
+ if(!multiple || deliveryTag > 0)
{
- _txnBuffer.enlist(new Ack(getUnackedMessageFinder().getValues(deliveryTag, multiple)));
+ checkAck(deliveryTag);
}
- catch(NoSuchElementException e)
+ //we use a single txn op for all acks and update this op
+ //as new acks come in. If this is the first ack in the txn
+ //we will need to create and enlist the op.
+ if(ackOp == null)
{
- throw new AMQException("Received ack for unrecognised delivery tag: " + deliveryTag);
+ ackOp = new TxAck(new AckMap());
+ _txnBuffer.enlist(ackOp);
+ }
+ //update the op to include this ack request
+ if(multiple && deliveryTag == 0)
+ {
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ //if have signalled to ack all, that refers only
+ //to all at this time
+ ackOp.update(_lastDeliveryTag, multiple);
+ }
+ }
+ else
+ {
+ ackOp.update(deliveryTag, multiple);
}
}
else
@@ -459,6 +463,17 @@ public class AMQChannel
}
}
+ private void checkAck(long deliveryTag) throws AMQException
+ {
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+ {
+ throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ }
+ }
+ }
+
private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
{
if (multiple)
@@ -587,6 +602,16 @@ public class AMQChannel
public void commit() throws AMQException
{
+ if(ackOp != null)
+ {
+ ackOp.consolidate();
+ if(ackOp.checkPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ }
+ ackOp = null;//already enlisted, after commit will reset regardless of outcome
+ }
+
_txnBuffer.commit();
//TODO: may need to return 'immediate' messages at this point
}
@@ -636,69 +661,22 @@ public class AMQChannel
_returns.clear();
}
- private OrderedMapHelper<Long, UnacknowledgedMessage> getUnackedMessageFinder()
- {
- return new OrderedMapHelper<Long, UnacknowledgedMessage>(_unacknowledgedMessageMap, _unacknowledgedMessageMapLock, 0L);
- }
-
-
- private class Ack implements TxnOp
+ //we use this wrapper to ensure we are always using the correct
+ //map instance (its not final unfortunately)
+ private class AckMap implements UnacknowledgedMessageMap
{
- private final Map<Long, UnacknowledgedMessage> _unacked;
-
- Ack(Map<Long, UnacknowledgedMessage> unacked) throws AMQException
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
{
- _unacked = unacked;
-
- //if any of the messages in unacked are persistent the txn
- //buffer must be marked as persistent:
- for(UnacknowledgedMessage msg : _unacked.values())
- {
- if(msg.message.isPersistent())
- {
- _txnBuffer.containsPersistentChanges();
- break;
- }
- }
+ impl().collect(deliveryTag, multiple, msgs);
}
-
- public void prepare() throws AMQException
+ public void remove(List<UnacknowledgedMessage> msgs)
{
- //make persistent changes, i.e. dequeue and decrementReference
- for(UnacknowledgedMessage msg : _unacked.values())
- {
- msg.discard();
- }
- }
-
- public void undoPrepare()
- {
- //decrementReference is annoyingly untransactional (due to
- //in memory counter) so if we failed in prepare for full
- //txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for(UnacknowledgedMessage msg : _unacked.values())
- {
-
- msg.message.incrementReference();
- }
- }
-
- public void commit()
- {
- //remove the unacked messages from the channels map
- synchronized(_unacknowledgedMessageMapLock)
- {
- for(long tag : _unacked.keySet())
- {
- _unacknowledgedMessageMap.remove(tag);
- }
- }
-
+ impl().remove(msgs);
}
- public void rollback()
+ private UnacknowledgedMessageMap impl()
{
+ return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
}
}
@@ -745,17 +723,6 @@ public class AMQChannel
public void prepare() throws AMQException
{
- //the routers reference can now be released
- _msg.decrementReference();
- try
- {
- _msg.checkDeliveredToConsumer();
- }
- catch(NoConsumersException e)
- {
- //TODO: store this for delivery after the commit-ok
- _returns.add(e.getReturnMessage(_channelId));
- }
}
public void undoPrepare()
@@ -767,6 +734,27 @@ public class AMQChannel
public void commit()
{
+ //The routers reference can now be released. This is done
+ //here to ensure that it happens after the queues that
+ //enqueue it have incremented their counts (which as a
+ //memory only operation is done in the commit phase).
+ try
+ {
+ _msg.decrementReference();
+ }
+ catch(AMQException e)
+ {
+ _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+ }
+ try
+ {
+ _msg.checkDeliveredToConsumer();
+ }
+ catch(NoConsumersException e)
+ {
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e.getReturnMessage(_channelId));
+ }
}
public void rollback()
diff --git a/java/broker/src/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/org/apache/qpid/server/ack/TxAck.java
new file mode 100644
index 0000000000..0d502a8f6e
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/TxAck.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.TxnOp;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A TxnOp implementation for handling accumulated acks
+ */
+public class TxAck implements TxnOp
+{
+ private final UnacknowledgedMessageMap _map;
+ private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>();
+ private final List<Long> _individual = new LinkedList<Long>();
+ private long _deliveryTag;
+ private boolean _multiple;
+
+ public TxAck(UnacknowledgedMessageMap map)
+ {
+ _map = map;
+ }
+
+ public void update(long deliveryTag, boolean multiple)
+ {
+ if(!multiple)
+ {
+ //have acked a single message that is not part of
+ //the previously acked region so record
+ //individually
+ _individual.add(deliveryTag);//_multiple && !multiple
+ }
+ else if(deliveryTag > _deliveryTag)
+ {
+ //have simply moved the last acked message on a
+ //bit
+ _deliveryTag = deliveryTag;
+ _multiple = true;
+ }
+ }
+
+ public void consolidate()
+ {
+ //lookup all the unacked messages that have been acked in this transaction
+ if(_multiple)
+ {
+ //get all the unacked messages for the accumulated
+ //multiple acks
+ _map.collect(_deliveryTag, true, _unacked);
+ }
+ //get any unacked messages for individual acks outside the
+ //range covered by multiple acks
+ for(long tag : _individual)
+ {
+ if(_deliveryTag < tag)
+ {
+ _map.collect(tag, false, _unacked);
+ }
+ }
+ }
+
+ public boolean checkPersistent() throws AMQException
+ {
+ //if any of the messages in unacked are persistent the txn
+ //buffer must be marked as persistent:
+ for(UnacknowledgedMessage msg : _unacked)
+ {
+ if(msg.message.isPersistent())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void prepare() throws AMQException
+ {
+ //make persistent changes, i.e. dequeue and decrementReference
+ for(UnacknowledgedMessage msg : _unacked)
+ {
+ msg.discard();
+ }
+ }
+
+ public void undoPrepare()
+ {
+ //decrementReference is annoyingly untransactional (due to
+ //in memory counter) so if we failed in prepare for full
+ //txn, this op will have to compensate by fixing the count
+ //in memory (persistent changes will be rolled back by store)
+ for(UnacknowledgedMessage msg : _unacked)
+ {
+ msg.message.incrementReference();
+ }
+ }
+
+ public void commit()
+ {
+ //remove the unacked messages from the channels map
+ _map.remove(_unacked);
+ }
+
+ public void rollback()
+ {
+ }
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java
new file mode 100644
index 0000000000..f13ecef7ac
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class UnacknowledgedMessage
+{
+ public final AMQMessage message;
+ public final String consumerTag;
+ public final long deliveryTag;
+ public AMQQueue queue;
+
+ public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag)
+ {
+ this.queue = queue;
+ this.message = message;
+ this.consumerTag = consumerTag;
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void discard() throws AMQException
+ {
+ if (queue != null)
+ {
+ message.dequeue(queue);
+ }
+ message.decrementReference();
+ }
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
new file mode 100644
index 0000000000..ccb55f7e2d
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+
+public interface UnacknowledgedMessageMap
+{
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+ public void remove(List<UnacknowledgedMessage> msgs);
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
new file mode 100644
index 0000000000..117793930f
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+import java.util.Map;
+
+public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+{
+ private final Object _lock;
+ private Map<Long, UnacknowledgedMessage> _map;
+
+ public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+ {
+ _lock = lock;
+ _map = map;
+ }
+
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ {
+ if (multiple)
+ {
+ collect(deliveryTag, msgs);
+ }
+ else
+ {
+ msgs.add(get(deliveryTag));
+ }
+
+ }
+
+ public void remove(List<UnacknowledgedMessage> msgs)
+ {
+ synchronized(_lock)
+ {
+ for(UnacknowledgedMessage msg : msgs)
+ {
+ _map.remove(msg.deliveryTag);
+ }
+ }
+ }
+
+ private UnacknowledgedMessage get(long key)
+ {
+ synchronized(_lock)
+ {
+ return _map.get(key);
+ }
+ }
+
+ private void collect(long key, List<UnacknowledgedMessage> msgs)
+ {
+ synchronized(_lock)
+ {
+ for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ {
+ msgs.add(entry.getValue());
+ if (entry.getKey() == key)
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
index 6573be782b..7666b5b3f8 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
@@ -255,7 +255,7 @@ public class AMQMessage
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*/
- public void decrementReference() throws AMQException
+ public void decrementReference() throws MessageCleanupException
{
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -263,7 +263,16 @@ public class AMQMessage
// not relying on the all the increments having taken place before the delivery manager decrements.
if (_referenceCount.decrementAndGet() == 0)
{
- _store.removeMessage(_messageId);
+ try
+ {
+ _store.removeMessage(_messageId);
+ }
+ catch(AMQException e)
+ {
+ //to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(_messageId, e);
+ }
}
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index 6c795590f6..7fb6ddcb5c 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -707,8 +707,16 @@ public class AMQQueue implements Managable
{
try
{
- msg.decrementReference();
msg.dequeue(this);
+ msg.decrementReference();
+ }
+ catch(MessageCleanupException e)
+ {
+ //Message was dequeued, but could notthen be deleted
+ //though it is no longer referenced. This should be very
+ //rare and can be detected and cleaned up on recovery or
+ //done through some form of manual intervention.
+ _logger.error(e, e);
}
catch(AMQException e)
{
@@ -777,7 +785,8 @@ public class AMQQueue implements Managable
public void prepare() throws AMQException
{
- record(_msg);
+ //do the persistent part of the record()
+ _msg.enqueue(AMQQueue.this);
}
public void undoPrepare()
@@ -786,6 +795,9 @@ public class AMQQueue implements Managable
public void commit()
{
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
try
{
process(_msg);
@@ -799,14 +811,6 @@ public class AMQQueue implements Managable
public void rollback()
{
- try
- {
- _msg.decrementReference();
- }
- catch (AMQException e)
- {
- _logger.error("Error rolling back a queue delivery: " + e, e);
- }
}
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java
new file mode 100644
index 0000000000..6f0fa285b1
--- /dev/null
+++ b/java/broker/src/org/apache/qpid/server/queue/MessageCleanupException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * Signals that the removal of a message once its refcount reached
+ * zero failed.
+ */
+public class MessageCleanupException extends AMQException
+{
+ public MessageCleanupException(long messageId, AMQException e)
+ {
+ super("Failed to cleanup message with id " + messageId, e);
+ }
+}
diff --git a/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java b/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
deleted file mode 100644
index 2c24944317..0000000000
--- a/java/broker/src/org/apache/qpid/server/util/OrderedMapHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import java.util.List;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * Utility class used by AMQChannel to retrieve unacknowledged
- * messages. Made generic to avoid exposing the inner class in
- * AMQChannel. Put in this package to keep ot out the way.
- */
-public class OrderedMapHelper<K, V>
-{
- private final Map<K, V> _map;
- private final Object _lock;
- private final K _wildcard;
-
- public OrderedMapHelper(Map<K, V> map, Object lock, K wildcard)
- {
- _map = map;
- _lock = lock;
- _wildcard = wildcard;
- }
-
- /**
- * Assumes the map passed in is ordered. Returns a copy of the
- * map containing an individual key-value pair or a list of
- * key-values upto and including the one matching the given
- * key. If multiple == true and the key == the wildcard specified
- * on construction, then all the values in the map will be
- * returned.
- */
- public Map<K, V> getValues(K key, boolean multiple) throws NoSuchElementException
- {
- if (multiple)
- {
- if(key == _wildcard)
- {
- synchronized(_lock)
- {
- return new LinkedHashMap<K, V>(_map);
- }
- }
- else
- {
- return getValues(key);
- }
- }
- else
- {
- Map<K, V> values = new LinkedHashMap<K, V>();
- values.put(key, getValue(key));
- return values;
- }
- }
-
- private V getValue(K key) throws NoSuchElementException
- {
- V value;
- synchronized(_lock)
- {
- value = _map.get(key);
- }
-
- if(value == null)
- {
- throw new NoSuchElementException();
- }
- else
- {
- return value;
- }
- }
-
- private Map<K, V> getValues(K key) throws NoSuchElementException
- {
- Map<K, V> values = new LinkedHashMap<K, V>();
- synchronized(_lock)
- {
- if (!_map.containsKey(key))
- {
- throw new NoSuchElementException();
- }
-
- for(Map.Entry<K, V> entry : _map.entrySet())
- {
- values.put(entry.getKey(), entry.getValue());
- if (entry.getKey() == key)
- {
- break;
- }
- }
- }
- return values;
- }
-
-} \ No newline at end of file
diff --git a/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
new file mode 100644
index 0000000000..4bf23ec25a
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class TxAckTest
+{
+ private Scenario individual;
+ private Scenario multiple;
+ private Scenario combined;
+
+ @Before
+ public void setup() throws Exception
+ {
+ //ack only 5th msg
+ individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+ individual.update(5, false);
+
+ //ack all up to and including 5th msg
+ multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+ multiple.update(5, true);
+
+ //leave only 8th and 9th unacked
+ combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+ combined.update(3, false);
+ combined.update(5, true);
+ combined.update(7, true);
+ combined.update(2, true);//should be ignored
+ combined.update(1, false);//should be ignored
+ combined.update(10, false);
+ }
+
+ @Test
+ public void prepare() throws AMQException
+ {
+ individual.prepare();
+ multiple.prepare();
+ combined.prepare();
+ }
+
+ @Test
+ public void undoPrepare() throws AMQException
+ {
+ individual.undoPrepare();
+ multiple.undoPrepare();
+ combined.undoPrepare();
+ }
+
+ @Test
+ public void commit() throws AMQException
+ {
+ individual.commit();
+ multiple.commit();
+ combined.commit();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TxAckTest.class);
+ }
+
+ private class Scenario
+ {
+ private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>();
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages);
+ private final TxAck _op = new TxAck(_map);
+ private final List<Long> _acked;
+ private final List<Long> _unacked;
+
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ {
+ for(int i = 0; i < messageCount; i++)
+ {
+ long deliveryTag = i + 1;
+ _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag));
+ }
+ _acked = acked;
+ _unacked = unacked;
+ }
+
+ void update(long deliverytag, boolean multiple)
+ {
+ _op.update(deliverytag, multiple);
+ }
+
+ private void assertCount(List<Long> tags, int expected)
+ {
+ for(long tag : tags)
+ {
+ UnacknowledgedMessage u = _messages.get(tag);
+ assertTrue("Message not found for tag " + tag, u != null);
+ ((TestMessage) u.message).assertCountEquals(expected);
+ }
+ }
+
+ void prepare() throws AMQException
+ {
+ _op.consolidate();
+ _op.prepare();
+
+ assertCount(_acked, -1);
+ assertCount(_unacked, 0);
+
+ }
+ void undoPrepare()
+ {
+ _op.consolidate();
+ _op.undoPrepare();
+
+ assertCount(_acked, 1);
+ assertCount(_unacked, 0);
+ }
+
+ void commit()
+ {
+ _op.consolidate();
+ _op.commit();
+
+
+ //check acked messages are removed from map
+ HashSet<Long> keys = new HashSet<Long>(_messages.keySet());
+ keys.retainAll(_acked);
+ assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+ //check unacked messages are still in map
+ keys = new HashSet<Long>(_unacked);
+ keys.removeAll(_messages.keySet());
+ assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+ }
+ }
+
+ private class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag)
+ {
+ super(new TestableMemoryMessageStore(), null);
+ _tag = tag;
+ }
+
+ public void incrementReference()
+ {
+ _count++;
+ }
+
+ public void decrementReference()
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java
new file mode 100644
index 0000000000..3d7db01365
--- /dev/null
+++ b/java/broker/test/src/org/apache/qpid/server/ack/UnitTests.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TxAckTest.class
+})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
index 8ce006b3ae..f22ccb8caf 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -93,15 +94,15 @@ public class AckTest
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
for (int i = 1; i <= map.size(); i++)
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
}
assertTrue(_messageStore.getMessageMap().size() == msgCount);
@@ -118,7 +119,7 @@ public class AckTest
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageMap().size() == 0);
}
@@ -135,16 +136,16 @@ public class AckTest
publishMessages(msgCount);
_channel.acknowledgeMessage(5, false);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount - 1);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
int i = 1;
while (i <= map.size())
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
@@ -166,16 +167,16 @@ public class AckTest
publishMessages(msgCount);
_channel.acknowledgeMessage(5, true);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
int i = 1;
while (i <= map.size())
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i + 5);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
++i;
}
@@ -193,16 +194,16 @@ public class AckTest
publishMessages(msgCount);
_channel.acknowledgeMessage(0, true);
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
int i = 1;
while (i <= map.size())
{
- Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
assertTrue(entry.getKey() == i + 5);
- AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
++i;
}
@@ -221,7 +222,7 @@ public class AckTest
// at this point we should have sent out only 5 messages with a further 5 queued
// up in the channel which should be suspended
assertTrue(_subscription.isSuspended());
- Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
_channel.acknowledgeMessage(5, true);
assertTrue(!_subscription.isSuspended());
diff --git a/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java b/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
deleted file mode 100644
index 74340e98c0..0000000000
--- a/java/broker/test/src/org/apache/qpid/server/util/OrderedMapHelperTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import junit.framework.JUnit4TestAdapter;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.Ignore;
-import org.apache.qpid.AMQException;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class OrderedMapHelperTest
-{
- private final Object lock = new Object();
- private final Map<Integer, String> map = new LinkedHashMap<Integer, String>();
- private final OrderedMapHelper<Integer, String> helper = new OrderedMapHelper<Integer, String>(map, lock, 0);
-
- @Before
- public void setup() throws Exception
- {
- map.put(1, "One");
- map.put(2, "Two");
- map.put(5, "Five");
- map.put(8, "Eight");
- map.put(10, "Ten");
- }
-
- @Test
- public void specific()
- {
- Map<Integer, String> slice = helper.getValues(5, false);
- assertEquals(1, slice.size());
- assertTrue(slice.containsKey(5));
- assertTrue(slice.containsValue("Five"));
- assertEquals("Five", slice.get(5));
- }
-
- @Test
- public void multiple()
- {
- Map<Integer, String> slice = helper.getValues(5, true);
- assertEquals(3, slice.size());
-
- assertTrue(slice.containsKey(1));
- assertTrue(slice.containsKey(2));
- assertTrue(slice.containsKey(5));
-
- assertEquals("One", slice.get(1));
- assertEquals("Two", slice.get(2));
- assertEquals("Five", slice.get(5));
- }
-
- @Test
- public void all()
- {
- Map<Integer, String> slice = helper.getValues(0/*the 'wildcard'*/, true);
- assertEquals(5, slice.size());
-
- assertTrue(slice.containsKey(1));
- assertTrue(slice.containsKey(2));
- assertTrue(slice.containsKey(5));
- assertTrue(slice.containsKey(8));
- assertTrue(slice.containsKey(10));
-
- assertEquals("One", slice.get(1));
- assertEquals("Two", slice.get(2));
- assertEquals("Five", slice.get(5));
- assertEquals("Eight", slice.get(8));
- assertEquals("Ten", slice.get(10));
- }
-
- public static junit.framework.Test suite()
- {
- return new JUnit4TestAdapter(OrderedMapHelperTest.class);
- }
-}
diff --git a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
index 5c2242fb0d..d6cc471413 100644
--- a/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
+++ b/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
@@ -22,7 +22,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
-@Suite.SuiteClasses({LoggingProxyTest.class, OrderedMapHelperTest.class})
+@Suite.SuiteClasses({LoggingProxyTest.class})
public class UnitTests
{
public static junit.framework.Test suite()
diff --git a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
index 86744d162f..200c89bebe 100644
--- a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
+++ b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
@@ -120,6 +120,7 @@ public class TransactedTest
expect("Z", testConsumer2.receive(1000));
assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
@Test
@@ -140,6 +141,7 @@ public class TransactedTest
expect("B", consumer.receive(1000));
expect("C", consumer.receive(1000));
+ assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));
}