summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-06 00:41:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-06 00:41:07 +0000
commit9fc6f9739adf8ef78b847b729cc2311bd9033fa2 (patch)
tree201a53077cd74ba5c7067c2611f57dfb1def06c1
parentd22bb5fc9bd1e246c57f4936da07f089a981d034 (diff)
downloadqpid-python-9fc6f9739adf8ef78b847b729cc2311bd9033fa2.tar.gz
hide methods from the public interfaces of Consumer and MessageInstance
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565024 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java21
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java24
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java7
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java11
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java29
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java37
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java14
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java8
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java17
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java18
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java8
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java16
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java18
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java140
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java26
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java8
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java6
31 files changed, 238 insertions, 234 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
index 4c1ebc3547..7f6f7f44e9 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
@@ -44,8 +44,6 @@ public interface Consumer
LogActor getLogActor();
- boolean isTransient();
-
long getBytesOut();
long getMessagesOut();
@@ -63,16 +61,12 @@ public interface Consumer
AMQSessionModel getSessionModel();
- MessageInstance.ConsumerAcquiredState getOwningState();
-
void setNoLocal(boolean noLocal);
long getId();
boolean isSuspended();
- boolean hasInterest(MessageInstance msg);
-
boolean isClosed();
boolean acquires();
@@ -81,17 +75,6 @@ public interface Consumer
void close() throws AMQException;
- void send(MessageInstance entry, boolean batch) throws AMQException;
-
- boolean resend(MessageInstance entry) throws AMQException;
-
- void flushBatched();
-
- void queueDeleted();
-
-
- boolean wouldSuspend(MessageInstance msg);
-
boolean trySendLock();
@@ -99,16 +82,12 @@ public interface Consumer
void releaseSendLock();
- void restoreCredit(final MessageInstance queueEntry);
-
void setStateListener(final StateChangeListener<? extends Consumer, State> listener);
public State getState();
boolean isActive();
- void queueEmpty() throws AMQException;
-
String getName();
void flush() throws AMQException;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index bc5cdaa268..97977acb54 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -427,10 +428,10 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
- public final int send(final ServerMessage message,
+ public final <C extends Consumer> int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance> postEnqueueAction)
+ final Action<MessageInstance<C>> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index dd0121d91b..c25d962fb8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -334,10 +335,10 @@ public class DefaultExchange implements Exchange
return _id;
}
- public final int send(final ServerMessage message,
+ public final <C extends Consumer> int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance> postEnqueueAction)
+ final Action<MessageInstance<C>> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 78e2ab9a15..110c7c5bf5 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.message;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -36,8 +37,8 @@ public interface MessageDestination extends MessageNode
* @param postEnqueueAction action to perform on the result of every enqueue (may be null)
* @return the number of queues in which the message was enqueued performed
*/
- int send(ServerMessage message,
+ <C extends Consumer> int send(ServerMessage message,
InstanceProperties instanceProperties,
ServerTransaction txn,
- Action<MessageInstance> postEnqueueAction);
+ Action<MessageInstance<C>> postEnqueueAction);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index bbe80c1db7..c39e531d41 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
-public interface MessageInstance
+public interface MessageInstance<C extends Consumer>
{
@@ -45,33 +45,33 @@ public interface MessageInstance
void decrementDeliveryCount();
- void addStateChangeListener(StateChangeListener<MessageInstance, State> listener);
+ void addStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener);
- boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener);
boolean acquiredByConsumer();
- boolean isAcquiredBy(Consumer consumer);
+ boolean isAcquiredBy(C consumer);
void setRedelivered();
boolean isRedelivered();
- Consumer getDeliveredConsumer();
+ C getDeliveredConsumer();
void reject();
- boolean isRejectedBy(Consumer consumer);
+ boolean isRejectedBy(C consumer);
boolean getDeliveredToConsumer();
boolean expired() throws AMQException;
- boolean acquire(Consumer sub);
+ boolean acquire(C sub);
int getMaximumDeliveryCount();
- int routeToAlternate(Action<MessageInstance> action, ServerTransaction txn);
+ int routeToAlternate(Action<MessageInstance<C>> action, ServerTransaction txn);
Filterable asFilterable();
@@ -161,11 +161,11 @@ public interface MessageInstance
}
}
- public final class ConsumerAcquiredState extends EntryState
+ public final class ConsumerAcquiredState<C extends Consumer> extends EntryState
{
- private final Consumer _consumer;
+ private final C _consumer;
- public ConsumerAcquiredState(Consumer consumer)
+ public ConsumerAcquiredState(C consumer)
{
_consumer = consumer;
}
@@ -176,7 +176,7 @@ public interface MessageInstance
return State.ACQUIRED;
}
- public Consumer getConsumer()
+ public C getConsumer()
{
return _consumer;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index c29b9786df..1ef06728f8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -32,13 +32,13 @@ import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.EnumSet;
-public interface MessageSource extends TransactionLogResource, MessageNode
+public interface MessageSource<C extends Consumer> extends TransactionLogResource, MessageNode
{
- Consumer addConsumer(ConsumerTarget target, FilterManager filters,
+ C addConsumer(ConsumerTarget target, FilterManager filters,
Class<? extends ServerMessage> messageClass,
String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
- Collection<Consumer> getConsumers();
+ Collection<C> getConsumers();
void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index ae07005679..2b5176aa65 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
@@ -144,11 +145,11 @@ public interface VirtualHost extends ConfiguredObject
public static interface Transaction
{
- void dequeue(QueueEntry entry);
+ void dequeue(MessageInstance entry);
- void copy(QueueEntry entry, Queue queue);
+ void copy(MessageInstance entry, Queue queue);
- void move(QueueEntry entry, Queue queue);
+ void move(MessageInstance entry, Queue queue);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 300b6e6618..13fe4e6b72 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -44,6 +44,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -759,11 +760,11 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
op.withinTransaction(new Transaction()
{
- public void dequeue(final QueueEntry entry)
+ public void dequeue(final MessageInstance entry)
{
if(entry.acquire())
{
- txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
@@ -777,7 +778,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
}
- public void copy(QueueEntry entry, Queue queue)
+ public void copy(MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -803,7 +804,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
- public void move(final QueueEntry entry, Queue queue)
+ public void move(final MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -830,7 +831,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
entry.release();
}
});
- txn.dequeue(entry.getQueue(), message,
+ txn.dequeue(entry.getOwningResource(), message,
new ServerTransaction.Action()
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4fe6117d88..c4359d8a40 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -36,7 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker
+public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue<C>, MessageSource<C>, CapacityChecker
{
public interface NotificationListener
@@ -183,8 +183,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQu
Set<NotificationCheck> getNotificationChecks();
- void flushConsumer(final Consumer sub) throws AMQException;
-
void deliverAsync();
void stop();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index 972488da4b..fce0572b3b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -22,15 +22,15 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
-public interface BaseQueue extends TransactionLogResource
+public interface BaseQueue<C extends Consumer> extends TransactionLogResource
{
- void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException;
+ void enqueue(ServerMessage message, Action<MessageInstance<C>> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index e4725e0e2a..7469e95394 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -43,7 +43,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
- public ConflationQueueList(AMQQueue queue, String conflationKey)
+ public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey)
{
super(queue);
_conflationKey = conflationKey;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index 22cb6aeb7b..4c74e5ba0b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -241,17 +241,17 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
{
private final Group _group;
public GroupStateChangeListener(final Group group,
- final MessageInstance entry)
+ final MessageInstance<QueueConsumer> entry)
{
_group = group;
}
- public void stateChanged(final MessageInstance entry,
+ public void stateChanged(final MessageInstance<QueueConsumer> entry,
final MessageInstance.State oldState,
final MessageInstance.State newState)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 66315af9fb..05d84327d4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage;
public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
{
- private final AMQQueue _queue;
+ private final AMQQueue<QueueConsumer> _queue;
private final PriorityQueueEntrySubList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
@@ -46,7 +46,7 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
return _priorities;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queue;
}
@@ -166,7 +166,7 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
{
private int _listPriority;
- public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
+ public PriorityQueueEntrySubList(AMQQueue<QueueConsumer> queue, int listPriority)
{
super(queue);
_listPriority = listPriority;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index d8332f7306..f1b612a086 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -213,32 +213,27 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
}
}
- @Override
- public void flushBatched()
+ void flushBatched()
{
_target.flushBatched();
}
- @Override
- public void queueDeleted()
+ void queueDeleted()
{
_target.queueDeleted();
}
- @Override
- public boolean wouldSuspend(final MessageInstance msg)
+ boolean wouldSuspend(final MessageInstance msg)
{
return !_target.allocateCredit(msg.getMessage());
}
- @Override
- public void restoreCredit(final MessageInstance queueEntry)
+ void restoreCredit(final MessageInstance queueEntry)
{
_target.restoreCredit(queueEntry.getMessage());
}
- @Override
- public void queueEmpty() throws AMQException
+ void queueEmpty() throws AMQException
{
_target.queueEmpty();
}
@@ -298,8 +293,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
getQueue().flushConsumer(this);
}
- @Override
- public boolean resend(final MessageInstance entry) throws AMQException
+ boolean resend(final MessageInstance entry) throws AMQException
{
return getQueue().resend((QueueEntry)entry, this);
}
@@ -430,7 +424,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
return _createTime;
}
- public final MessageInstance.ConsumerAcquiredState getOwningState()
+ final MessageInstance.ConsumerAcquiredState getOwningState()
{
return _owningState;
}
@@ -465,10 +459,15 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
return _deliveredCount.longValue();
}
- public final void send(final MessageInstance entry, final boolean batch) throws AMQException
+ final void send(final QueueEntry entry, final boolean batch) throws AMQException
{
_deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getMessage().getSize());
+ ServerMessage message = entry.getMessage();
+ if(message == null)
+ {
+ throw new AMQException("message was null!");
+ }
+ _deliveredBytes.addAndGet(message.getSize());
_target.send(entry, batch);
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index d984cf8ab4..6a42088c47 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -22,10 +22,10 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
-public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
{
- AMQQueue getQueue();
+ AMQQueue<QueueConsumer> getQueue();
long getSize();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index d39f49a619..788a40bf85 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -63,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<MessageInstance, State>> _stateChangeListeners;
+ private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -138,7 +138,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return _entryId;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queueEntryList.getQueue();
}
@@ -202,7 +202,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
- public boolean acquire(Consumer sub)
+ public boolean acquire(QueueConsumer sub)
{
final boolean acquired = acquire(sub.getOwningState());
if(acquired)
@@ -218,7 +218,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(Consumer consumer)
+ public boolean isAcquiredBy(QueueConsumer consumer)
{
EntryState state = _state;
return state instanceof ConsumerAcquiredState
@@ -264,12 +264,12 @@ public abstract class QueueEntryImpl implements QueueEntry
return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
}
- public Consumer getDeliveredConsumer()
+ public QueueConsumer getDeliveredConsumer()
{
EntryState state = _state;
if (state instanceof ConsumerAcquiredState)
{
- return ((ConsumerAcquiredState) state).getConsumer();
+ return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
else
{
@@ -279,7 +279,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public void reject()
{
- Consumer consumer = getDeliveredConsumer();
+ QueueConsumer consumer = getDeliveredConsumer();
if (consumer != null)
{
@@ -296,7 +296,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- public boolean isRejectedBy(Consumer consumer)
+ public boolean isRejectedBy(QueueConsumer consumer)
{
if (_rejectedBy != null) // We have consumers that rejected this message
@@ -333,7 +333,7 @@ public abstract class QueueEntryImpl implements QueueEntry
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<MessageInstance, State> l : _stateChangeListeners)
+ for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -364,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public int routeToAlternate(final Action<MessageInstance> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -376,7 +376,10 @@ public abstract class QueueEntryImpl implements QueueEntry
txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
- int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
+ int enqueues = alternateExchange.send(getMessage(),
+ getInstanceProperties(),
+ txn,
+ action);
txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
{
@@ -409,21 +412,21 @@ public abstract class QueueEntryImpl implements QueueEntry
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
{
- Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance, State>>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
{
- Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
@@ -491,7 +494,7 @@ public abstract class QueueEntryImpl implements QueueEntry
@Override
public boolean resend() throws AMQException
{
- Consumer sub = getDeliveredConsumer();
+ QueueConsumer sub = getDeliveredConsumer();
if(sub != null)
{
return sub.resend(this);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 641aaa0a08..ad1f703f51 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage;
public interface QueueEntryList<Q extends QueueEntry>
{
- AMQQueue getQueue();
+ AMQQueue<QueueConsumer> getQueue();
Q add(ServerMessage message);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7435c690b3..9bf237660c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -56,7 +56,7 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue,
+public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
StateChangeListener<QueueConsumer, Consumer.State>,
MessageGroupManager.ConsumerResetHelper
{
@@ -525,9 +525,9 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public Collection<Consumer> getConsumers()
+ public Collection<QueueConsumer> getConsumers()
{
- List<Consumer> consumers = new ArrayList<Consumer>();
+ List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
@@ -636,7 +636,7 @@ public class SimpleAMQQueue implements AMQQueue,
enqueue(message, null);
}
- public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
@@ -1464,7 +1464,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void flushConsumer(Consumer sub) throws AMQException
+ void flushConsumer(QueueConsumer sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
@@ -1474,7 +1474,7 @@ public class SimpleAMQQueue implements AMQQueue,
flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushConsumer(Consumer sub, long iterations) throws AMQException
+ boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1968,7 +1968,7 @@ public class SimpleAMQQueue implements AMQQueue,
return _notificationChecks;
}
- private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+ private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
{
private final QueueConsumer _sub;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index b8d8ec19f4..101771c7cc 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -39,7 +39,7 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
(SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail");
- private final AMQQueue _queue;
+ private final AMQQueue<QueueConsumer> _queue;
static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
_nextUpdater = SimpleQueueEntryImpl._nextUpdater;
@@ -49,7 +49,7 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
- public SimpleQueueEntryList(AMQQueue queue)
+ public SimpleQueueEntryList(AMQQueue<QueueConsumer> queue)
{
_queue = queue;
_head = new SimpleQueueEntryImpl(this);
@@ -71,7 +71,7 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queue;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 3185abc6cd..1f1e50a058 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -50,7 +50,7 @@ public class SortedQueue extends OutOfOrderQueue
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index 85559157a9..336ee566eb 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
* Uses the red/black tree algorithm specified in "Introduction to Algorithms".
* ISBN-10: 0262033844
* ISBN-13: 978-0262033848
- * @see http://en.wikipedia.org/wiki/Red-black_tree
+ * see http://en.wikipedia.org/wiki/Red-black_tree
*/
public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
{
@@ -36,17 +36,17 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
private SortedQueueEntryImpl _root;
private long _entryId = Long.MIN_VALUE;
private final Object _lock = new Object();
- private final AMQQueue _queue;
+ private final AMQQueue<QueueConsumer> _queue;
private final String _propertyName;
- public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+ public SortedQueueEntryList(final AMQQueue<QueueConsumer> queue, final String propertyName)
{
_queue = queue;
_head = new SortedQueueEntryImpl(this);
_propertyName = propertyName;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queue;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index a688b493e1..de7369f5ed 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -46,7 +47,7 @@ public class DurableConfigurationStoreHelper
Queue.EXCLUSIVE,
Queue.ALTERNATE_EXCHANGE));
- public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
+ public static void updateQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue) throws AMQStoreException
{
Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
attributesMap.put(Queue.NAME, queue.getName());
@@ -71,7 +72,7 @@ public class DurableConfigurationStoreHelper
store.update(queue.getId(), QUEUE, attributesMap);
}
- public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
+ public static void createQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue)
throws AMQStoreException
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 764549626a..ea9d0ac693 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -25,6 +25,7 @@ import junit.framework.Assert;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -85,7 +86,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testDirectMatch() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -108,7 +109,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testStarMatch() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -139,7 +140,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashMatch() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -190,7 +191,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMidHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -216,7 +217,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMatchAfterHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -255,7 +256,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashAfterHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
@@ -277,7 +278,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
@@ -321,7 +322,7 @@ public class TopicExchangeTest extends QpidTestCase
when(message.getMessageNumber()).thenReturn(messageNumber);
for(BaseQueue q : queues)
{
- q.enqueue(message);
+ q.enqueue(message, null);
}
return queues.size();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 7092b50c52..ced00dc578 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -51,19 +51,19 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
// Enqueue messages in order
SimpleAMQQueue queue = getQueue();
- queue.enqueue(createMessage(1L, (byte) 10));
- queue.enqueue(createMessage(2L, (byte) 4));
- queue.enqueue(createMessage(3L, (byte) 0));
+ queue.enqueue(createMessage(1L, (byte) 10), null);
+ queue.enqueue(createMessage(2L, (byte) 4), null);
+ queue.enqueue(createMessage(3L, (byte) 0), null);
// Enqueue messages in reverse order
- queue.enqueue(createMessage(4L, (byte) 0));
- queue.enqueue(createMessage(5L, (byte) 4));
- queue.enqueue(createMessage(6L, (byte) 10));
+ queue.enqueue(createMessage(4L, (byte) 0), null);
+ queue.enqueue(createMessage(5L, (byte) 4), null);
+ queue.enqueue(createMessage(6L, (byte) 10), null);
// Enqueue messages out of order
- queue.enqueue(createMessage(7L, (byte) 4));
- queue.enqueue(createMessage(8L, (byte) 10));
- queue.enqueue(createMessage(9L, (byte) 0));
+ queue.enqueue(createMessage(7L, (byte) 4), null);
+ queue.enqueue(createMessage(8L, (byte) 10), null);
+ queue.enqueue(createMessage(9L, (byte) 0), null);
// Register subscriber
queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 1158781beb..e9c9a0305f 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -43,7 +43,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
-public class MockAMQQueue implements AMQQueue
+public class MockAMQQueue implements AMQQueue<QueueConsumer>
{
private boolean _deleted = false;
private String _name;
@@ -208,7 +208,7 @@ public class MockAMQQueue implements AMQQueue
}
@Override
- public Consumer addConsumer(final ConsumerTarget target,
+ public QueueConsumer addConsumer(final ConsumerTarget target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
@@ -226,7 +226,7 @@ public class MockAMQQueue implements AMQQueue
- public Collection<Consumer> getConsumers()
+ public Collection<QueueConsumer> getConsumers()
{
return Collections.emptyList();
}
@@ -306,7 +306,7 @@ public class MockAMQQueue implements AMQQueue
{
}
- public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
{
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 95a6030d6a..7ab7a07ab6 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -42,7 +42,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public boolean acquire(Consumer sub)
+ public boolean acquire(QueueConsumer sub)
{
return false;
}
@@ -58,12 +58,12 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public boolean isAcquiredBy(Consumer consumer)
+ public boolean isAcquiredBy(QueueConsumer consumer)
{
return false;
}
- public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
{
}
@@ -73,7 +73,7 @@ public class MockQueueEntry implements QueueEntry
}
- public int routeToAlternate(final Action<MessageInstance> action, final ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, final ServerTransaction txn)
{
return 0;
}
@@ -88,7 +88,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public Consumer getDeliveredConsumer()
+ public QueueConsumer getDeliveredConsumer()
{
return null;
}
@@ -103,7 +103,7 @@ public class MockQueueEntry implements QueueEntry
return _message;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return null;
}
@@ -126,7 +126,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean isRejectedBy(Consumer consumer)
+ public boolean isRejectedBy(QueueConsumer consumer)
{
return false;
@@ -153,7 +153,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
{
return false;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 0b2c6c900c..95139d8740 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -21,11 +21,11 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import java.lang.reflect.Field;
@@ -113,16 +113,16 @@ public abstract class QueueEntryImplTestBase extends TestCase
*/
private void acquire()
{
- _queueEntry.acquire(newMockConsumer());
+ _queueEntry.acquire(newConsumer());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
- private Consumer newMockConsumer()
+ private QueueConsumer newConsumer()
{
- final Consumer consumer = mock(Consumer.class);
- when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer));
- when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+ final ConsumerTarget target = mock(ConsumerTarget.class);
+ when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
+ final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
return consumer;
}
@@ -153,7 +153,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
*/
public void testRejectAndRejectedBy()
{
- Consumer sub = newMockConsumer();
+ QueueConsumer sub = newConsumer();
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
@@ -167,7 +167,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
//repeat rejection using a second consumer
- Consumer sub2 = newMockConsumer();
+ QueueConsumer sub2 = newConsumer();
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 62120f26d3..f2b53f95c3 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -168,14 +168,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Check adding a consumer adds it to the queue
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
@@ -194,7 +195,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
- _queue.enqueue(messageB);
+ _queue.enqueue(messageB, null);
assertNull(_consumer.getQueueContext());
}
@@ -202,9 +203,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
@@ -218,10 +220,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- _queue.enqueue(messageA);
- _queue.enqueue(messageB);
+ _queue.enqueue(messageA, null);
+ _queue.enqueue(messageB, null);
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
@@ -245,13 +248,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry) entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
@@ -298,13 +295,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry) entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue one message with expiration set for a short time in the future */
@@ -356,13 +347,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry) entry);
- }
- };
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
@@ -420,14 +405,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
- {
- public void performAction(MessageInstance entry)
- {
- queueEntries.add((QueueEntry)entry);
- }
- };
-
+ EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue two messages */
@@ -460,7 +438,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Check adding an exclusive consumer adds it to the queue
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.EXCLUSIVE));
+ EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -468,7 +447,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- _queue.enqueue(messageA);
+ _queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
@@ -485,7 +464,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
_queue.addConsumer(subB, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
}
catch (AMQException e)
@@ -498,7 +478,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
// existing consumer
_consumer.close();
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
try
{
@@ -522,9 +503,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(new Long(25));
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
_consumer.close();
assertTrue("Queue was not deleted when consumer was removed",
_queue.isDeleted());
@@ -536,12 +518,27 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(id);
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+
+ _queue.enqueue(message, new Action<MessageInstance<QueueConsumer>>()
+ {
+ @Override
+ public void performAction(final MessageInstance<QueueConsumer> object)
+ {
+ QueueEntry entry = (QueueEntry) object;
+ entry.setRedelivered();
+ try
+ {
+ _consumer.resend(entry);
+ }
+ catch (AMQException e)
+ {
+ fail("Exception thrown: " + e.getMessage());
+ }
+ }
+ });
+
- _queue.enqueue(message);
- QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
- entry.setRedelivered();
- _consumer.resend(entry);
}
@@ -552,7 +549,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -568,7 +565,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -589,7 +586,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -610,7 +607,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message);
+ _queue.enqueue(message, null);
}
// Get non-existent 0th QueueEntry & check returned list was empty
@@ -953,7 +950,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
entries.get(0).getMessage().getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -1018,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
@Override
- public boolean acquire(Consumer sub)
+ public boolean acquire(QueueConsumer sub)
{
if(message.getMessageNumber() % 2 == 0)
{
@@ -1044,7 +1042,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
}
catch (AMQException e)
{
@@ -1077,7 +1076,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify adding an inactive consumer doesn't increase the count
@@ -1089,7 +1089,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.noneOf(Consumer.Option.class));
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
@@ -1133,10 +1134,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
- _queue.enqueue(createMessage(new Long(24)));
+ _queue.enqueue(createMessage(new Long(24)), null);
verifyZeroInteractions(listener);
- _queue.enqueue(createMessage(new Long(25)));
+ _queue.enqueue(createMessage(new Long(25)), null);
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
@@ -1145,9 +1146,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
- _queue.enqueue(createMessage(new Long(24)));
- _queue.enqueue(createMessage(new Long(25)));
- _queue.enqueue(createMessage(new Long(26)));
+ _queue.enqueue(createMessage(new Long(24)), null);
+ _queue.enqueue(createMessage(new Long(25)), null);
+ _queue.enqueue(createMessage(new Long(26)), null);
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
@@ -1309,6 +1310,21 @@ public class SimpleAMQQueueTest extends QpidTestCase
return message;
}
+ private static class EntryListAddingAction implements Action<MessageInstance<QueueConsumer>>
+ {
+ private final ArrayList<QueueEntry> _queueEntries;
+
+ public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries)
+ {
+ _queueEntries = queueEntries;
+ }
+
+ public void performAction(MessageInstance entry)
+ {
+ _queueEntries.add((QueueEntry) entry);
+ }
+ }
+
class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
{
QueueEntryList _list;
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index b5c4724292..53022c333e 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -105,18 +106,7 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance entry)
- {
- TransactionLogResource queue = entry.getOwningResource();
- if(queue instanceof CapacityChecker)
- {
- ((CapacityChecker)queue).checkCapacity(ServerSession.this);
- }
- }
- };
+ private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
public static interface MessageDispositionChangeListener
{
@@ -938,4 +928,16 @@ public class ServerSession extends Session
return getId().compareTo(o.getId());
}
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ {
+ @Override
+ public void performAction(final MessageInstance<C> entry)
+ {
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
+ }
+ }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index dc9a6484fa..c7029e83a2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1192,14 +1192,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private class ImmediateAction implements Action<MessageInstance>
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
{
public ImmediateAction()
{
}
- public void performAction(MessageInstance entry)
+ public void performAction(MessageInstance<C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
@@ -1258,10 +1258,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- private final class CapacityCheckAction implements Action<MessageInstance>
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
{
@Override
- public void performAction(final MessageInstance entry)
+ public void performAction(final MessageInstance<C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index e8c922b023..aa5a75396a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -62,7 +62,7 @@ public class ExtractResendAndRequeueTest extends TestCase
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue;
- private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+ private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
private Consumer _consumer;
private boolean _queueDeleted;
@@ -115,11 +115,11 @@ public class ExtractResendAndRequeueTest extends TestCase
*
* @return Subscription that performed the acquire
*/
- private void acquireMessages(LinkedList<QueueEntry> messageList)
+ private void acquireMessages(LinkedList<MessageInstance> messageList)
{
// Acquire messages in subscription
- for(QueueEntry entry : messageList)
+ for(MessageInstance entry : messageList)
{
when(entry.getDeliveredConsumer()).thenReturn(_consumer);
}