summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java')
-rw-r--r--M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java232
1 files changed, 0 insertions, 232 deletions
diff --git a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
deleted file mode 100644
index c567387662..0000000000
--- a/M4-RCs/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.ack;
-
-import org.apache.qpid.server.store.StoreContext;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.TransactionalContext;
-
-public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
-{
- private final Object _lock = new Object();
-
- private long _unackedSize;
-
- private Map<Long, QueueEntry> _map;
-
- private long _lastDeliveryTag;
-
- private final int _prefetchLimit;
-
- public UnacknowledgedMessageMapImpl(int prefetchLimit)
- {
- _prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
- }
-
- public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
- {
- if (multiple)
- {
- collect(deliveryTag, msgs);
- }
- else
- {
- msgs.put(deliveryTag, get(deliveryTag));
- }
-
- }
-
- public boolean contains(long deliveryTag) throws AMQException
- {
- synchronized (_lock)
- {
- return _map.containsKey(deliveryTag);
- }
- }
-
- public void remove(Map<Long,QueueEntry> msgs)
- {
- synchronized (_lock)
- {
- for (Long deliveryTag : msgs.keySet())
- {
- remove(deliveryTag);
- }
- }
- }
-
- public QueueEntry remove(long deliveryTag)
- {
- synchronized (_lock)
- {
-
- QueueEntry message = _map.remove(deliveryTag);
- if(message != null)
- {
- _unackedSize -= message.getMessage().getSize();
-
- }
-
- return message;
- }
- }
-
- public void visit(Visitor visitor) throws AMQException
- {
- synchronized (_lock)
- {
- Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
- for (Map.Entry<Long, QueueEntry> entry : currentEntries)
- {
- visitor.callback(entry.getKey().longValue(), entry.getValue());
- }
- visitor.visitComplete();
- }
- }
-
- public void add(long deliveryTag, QueueEntry message)
- {
- synchronized (_lock)
- {
- _map.put(deliveryTag, message);
- _unackedSize += message.getMessage().getSize();
- _lastDeliveryTag = deliveryTag;
- }
- }
-
- public Collection<QueueEntry> cancelAllMessages()
- {
- synchronized (_lock)
- {
- Collection<QueueEntry> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
- _unackedSize = 0l;
- return currentEntries;
- }
- }
-
- public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
- throws AMQException
- {
- synchronized (_lock)
- {
- txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
- }
- }
-
- public int size()
- {
- synchronized (_lock)
- {
- return _map.size();
- }
- }
-
- public void clear()
- {
- synchronized (_lock)
- {
- _map.clear();
- _unackedSize = 0l;
- }
- }
-
- public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
-
- {
- synchronized (_lock)
- {
- Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
- while (it.hasNext())
- {
- Map.Entry<Long, QueueEntry> unacked = it.next();
-
- if (unacked.getKey() > deliveryTag)
- {
- //This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
- " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- unacked.getValue().discard(storeContext);
-
- it.remove();
-
- _unackedSize -= unacked.getValue().getMessage().getSize();
-
-
- if (unacked.getKey() == deliveryTag)
- {
- break;
- }
- }
- }
- }
-
- public QueueEntry get(long key)
- {
- synchronized (_lock)
- {
- return _map.get(key);
- }
- }
-
- public Set<Long> getDeliveryTags()
- {
- synchronized (_lock)
- {
- return _map.keySet();
- }
- }
-
- private void collect(long key, Map<Long, QueueEntry> msgs)
- {
- synchronized (_lock)
- {
- for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
- {
- msgs.put(entry.getKey(),entry.getValue());
- if (entry.getKey() == key)
- {
- break;
- }
- }
- }
- }
-
- public long getUnacknowledgeBytes()
- {
- return _unackedSize;
- }
-}