summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java211
1 files changed, 211 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
new file mode 100644
index 0000000000..8061687175
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -0,0 +1,211 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.AMQException;
+
+import java.util.*;
+
+public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+{
+ private final Object _lock = new Object();
+
+ private Map<Long, UnacknowledgedMessage> _map;
+
+ private long _lastDeliveryTag;
+
+ private final int _prefetchLimit;
+
+ public UnacknowledgedMessageMapImpl(int prefetchLimit)
+ {
+ _prefetchLimit = prefetchLimit;
+ _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
+ }
+
+ /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+ {
+ _lock = lock;
+ _map = map;
+ } */
+
+ public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ {
+ if (multiple)
+ {
+ collect(deliveryTag, msgs);
+ }
+ else
+ {
+ msgs.add(get(deliveryTag));
+ }
+
+ }
+
+ public boolean contains(long deliveryTag) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ return _map.containsKey(deliveryTag);
+ }
+ }
+
+ public void remove(List<UnacknowledgedMessage> msgs)
+ {
+ synchronized (_lock)
+ {
+ for(UnacknowledgedMessage msg : msgs)
+ {
+ _map.remove(msg.deliveryTag);
+ }
+ }
+ }
+
+ public UnacknowledgedMessage remove(long deliveryTag)
+ {
+ synchronized (_lock)
+ {
+ return _map.remove(deliveryTag);
+ }
+ }
+
+ public void visit(Visitor visitor) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ Collection<UnacknowledgedMessage> currentEntries = _map.values();
+ for (UnacknowledgedMessage msg: currentEntries)
+ {
+ visitor.callback(msg);
+ }
+ }
+ }
+
+ public void add(long deliveryTag, UnacknowledgedMessage message)
+ {
+ synchronized( _lock)
+ {
+ _map.put(deliveryTag, message);
+ _lastDeliveryTag = deliveryTag;
+ }
+ }
+
+ public Collection<UnacknowledgedMessage> cancelAllMessages()
+ {
+ synchronized (_lock)
+ {
+ Collection<UnacknowledgedMessage> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ return currentEntries;
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
+ throws AMQException
+ {
+ synchronized (_lock)
+ {
+ txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
+ }
+ }
+
+ public int size()
+ {
+ synchronized (_lock)
+ {
+ return _map.size();
+ }
+ }
+
+ public void clear()
+ {
+ synchronized (_lock)
+ {
+ _map.clear();
+ }
+ }
+
+ public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<Long, UnacknowledgedMessage> unacked = it.next();
+
+ if (unacked.getKey() > deliveryTag)
+ {
+ //This should not occur now.
+ throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
+ " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
+ }
+
+ it.remove();
+
+ destination.add(unacked.getValue());
+ if (unacked.getKey() == deliveryTag)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException
+ {
+ synchronized (_lock)
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ {
+ long deliveryTag = entry.getKey();
+ String consumerTag = entry.getValue().consumerTag;
+ AMQMessage msg = entry.getValue().message;
+
+ msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ }
+ }
+ }
+ private UnacknowledgedMessage get(long key)
+ {
+ synchronized (_lock)
+ {
+ return _map.get(key);
+ }
+ }
+
+ private void collect(long key, List<UnacknowledgedMessage> msgs)
+ {
+ synchronized (_lock)
+ {
+ for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ {
+ msgs.add(entry.getValue());
+ if (entry.getKey() == key)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+
+}
+