From 9fc6f9739adf8ef78b847b729cc2311bd9033fa2 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 6 Feb 2014 00:41:07 +0000 Subject: hide methods from the public interfaces of Consumer and MessageInstance git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565024 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/consumer/Consumer.java | 21 ---- .../qpid/server/exchange/AbstractExchange.java | 5 +- .../qpid/server/exchange/DefaultExchange.java | 5 +- .../qpid/server/message/MessageDestination.java | 5 +- .../qpid/server/message/MessageInstance.java | 24 ++-- .../apache/qpid/server/message/MessageSource.java | 6 +- .../org/apache/qpid/server/model/VirtualHost.java | 7 +- .../server/model/adapter/VirtualHostAdapter.java | 11 +- .../org/apache/qpid/server/queue/AMQQueue.java | 4 +- .../org/apache/qpid/server/queue/BaseQueue.java | 6 +- .../qpid/server/queue/ConflationQueueList.java | 2 +- .../queue/DefinedGroupMessageGroupManager.java | 6 +- .../qpid/server/queue/PriorityQueueList.java | 6 +- .../apache/qpid/server/queue/QueueConsumer.java | 29 +++-- .../org/apache/qpid/server/queue/QueueEntry.java | 4 +- .../apache/qpid/server/queue/QueueEntryImpl.java | 37 +++--- .../apache/qpid/server/queue/QueueEntryList.java | 2 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 14 +-- .../qpid/server/queue/SimpleQueueEntryList.java | 6 +- .../org/apache/qpid/server/queue/SortedQueue.java | 2 +- .../qpid/server/queue/SortedQueueEntryList.java | 8 +- .../store/DurableConfigurationStoreHelper.java | 5 +- .../qpid/server/exchange/TopicExchangeTest.java | 17 +-- .../qpid/server/queue/AMQPriorityQueueTest.java | 18 +-- .../org/apache/qpid/server/queue/MockAMQQueue.java | 8 +- .../apache/qpid/server/queue/MockQueueEntry.java | 16 +-- .../qpid/server/queue/QueueEntryImplTestBase.java | 18 +-- .../qpid/server/queue/SimpleAMQQueueTest.java | 140 ++++++++++++--------- .../qpid/server/protocol/v0_10/ServerSession.java | 26 ++-- .../qpid/server/protocol/v0_8/AMQChannel.java | 8 +- .../protocol/v0_8/ExtractResendAndRequeueTest.java | 6 +- 31 files changed, 238 insertions(+), 234 deletions(-) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java index 4c1ebc3547..7f6f7f44e9 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java @@ -44,8 +44,6 @@ public interface Consumer LogActor getLogActor(); - boolean isTransient(); - long getBytesOut(); long getMessagesOut(); @@ -63,16 +61,12 @@ public interface Consumer AMQSessionModel getSessionModel(); - MessageInstance.ConsumerAcquiredState getOwningState(); - void setNoLocal(boolean noLocal); long getId(); boolean isSuspended(); - boolean hasInterest(MessageInstance msg); - boolean isClosed(); boolean acquires(); @@ -81,17 +75,6 @@ public interface Consumer void close() throws AMQException; - void send(MessageInstance entry, boolean batch) throws AMQException; - - boolean resend(MessageInstance entry) throws AMQException; - - void flushBatched(); - - void queueDeleted(); - - - boolean wouldSuspend(MessageInstance msg); - boolean trySendLock(); @@ -99,16 +82,12 @@ public interface Consumer void releaseSendLock(); - void restoreCredit(final MessageInstance queueEntry); - void setStateListener(final StateChangeListener listener); public State getState(); boolean isActive(); - void queueEmpty() throws AMQException; - String getName(); void flush() throws AMQException; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index bc5cdaa268..97977acb54 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; @@ -427,10 +428,10 @@ public abstract class AbstractExchange implements Exchange return queues; } - public final int send(final ServerMessage message, + public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action postEnqueueAction) + final Action> postEnqueueAction) { List queues = route(message, instanceProperties); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index dd0121d91b..c25d962fb8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -32,6 +32,7 @@ import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; @@ -334,10 +335,10 @@ public class DefaultExchange implements Exchange return _id; } - public final int send(final ServerMessage message, + public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action postEnqueueAction) + final Action> postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 78e2ab9a15..110c7c5bf5 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.message; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -36,8 +37,8 @@ public interface MessageDestination extends MessageNode * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ - int send(ServerMessage message, + int send(ServerMessage message, InstanceProperties instanceProperties, ServerTransaction txn, - Action postEnqueueAction); + Action> postEnqueueAction); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index bbe80c1db7..c39e531d41 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; -public interface MessageInstance +public interface MessageInstance { @@ -45,33 +45,33 @@ public interface MessageInstance void decrementDeliveryCount(); - void addStateChangeListener(StateChangeListener listener); + void addStateChangeListener(StateChangeListener, State> listener); - boolean removeStateChangeListener(StateChangeListener listener); + boolean removeStateChangeListener(StateChangeListener, State> listener); boolean acquiredByConsumer(); - boolean isAcquiredBy(Consumer consumer); + boolean isAcquiredBy(C consumer); void setRedelivered(); boolean isRedelivered(); - Consumer getDeliveredConsumer(); + C getDeliveredConsumer(); void reject(); - boolean isRejectedBy(Consumer consumer); + boolean isRejectedBy(C consumer); boolean getDeliveredToConsumer(); boolean expired() throws AMQException; - boolean acquire(Consumer sub); + boolean acquire(C sub); int getMaximumDeliveryCount(); - int routeToAlternate(Action action, ServerTransaction txn); + int routeToAlternate(Action> action, ServerTransaction txn); Filterable asFilterable(); @@ -161,11 +161,11 @@ public interface MessageInstance } } - public final class ConsumerAcquiredState extends EntryState + public final class ConsumerAcquiredState extends EntryState { - private final Consumer _consumer; + private final C _consumer; - public ConsumerAcquiredState(Consumer consumer) + public ConsumerAcquiredState(C consumer) { _consumer = consumer; } @@ -176,7 +176,7 @@ public interface MessageInstance return State.ACQUIRED; } - public Consumer getConsumer() + public C getConsumer() { return _consumer; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index c29b9786df..1ef06728f8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -32,13 +32,13 @@ import org.apache.qpid.server.store.TransactionLogResource; import java.util.Collection; import java.util.EnumSet; -public interface MessageSource extends TransactionLogResource, MessageNode +public interface MessageSource extends TransactionLogResource, MessageNode { - Consumer addConsumer(ConsumerTarget target, FilterManager filters, + C addConsumer(ConsumerTarget target, FilterManager filters, Class messageClass, String consumerName, EnumSet options) throws AMQException; - Collection getConsumers(); + Collection getConsumers(); void addConsumerRegistrationListener(ConsumerRegistrationListener listener); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index ae07005679..2b5176aa65 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.model; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; @@ -144,11 +145,11 @@ public interface VirtualHost extends ConfiguredObject public static interface Transaction { - void dequeue(QueueEntry entry); + void dequeue(MessageInstance entry); - void copy(QueueEntry entry, Queue queue); + void copy(MessageInstance entry, Queue queue); - void move(QueueEntry entry, Queue queue); + void move(MessageInstance entry, Queue queue); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 300b6e6618..13fe4e6b72 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -44,6 +44,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -759,11 +760,11 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual op.withinTransaction(new Transaction() { - public void dequeue(final QueueEntry entry) + public void dequeue(final MessageInstance entry) { if(entry.acquire()) { - txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action() + txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() { public void postCommit() { @@ -777,7 +778,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } - public void copy(QueueEntry entry, Queue queue) + public void copy(MessageInstance entry, Queue queue) { final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue(); @@ -803,7 +804,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } - public void move(final QueueEntry entry, Queue queue) + public void move(final MessageInstance entry, Queue queue) { final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue(); @@ -830,7 +831,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual entry.release(); } }); - txn.dequeue(entry.getQueue(), message, + txn.dequeue(entry.getOwningResource(), message, new ServerTransaction.Action() { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4fe6117d88..c4359d8a40 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -36,7 +36,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; -public interface AMQQueue extends Comparable, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker +public interface AMQQueue extends Comparable>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker { public interface NotificationListener @@ -183,8 +183,6 @@ public interface AMQQueue extends Comparable, ExchangeReferrer, BaseQu Set getNotificationChecks(); - void flushConsumer(final Consumer sub) throws AMQException; - void deliverAsync(); void stop(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 972488da4b..fce0572b3b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -22,15 +22,15 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; -public interface BaseQueue extends TransactionLogResource +public interface BaseQueue extends TransactionLogResource { - void enqueue(ServerMessage message) throws AMQException; - void enqueue(ServerMessage message, Action action) throws AMQException; + void enqueue(ServerMessage message, Action> action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index e4725e0e2a..7469e95394 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -43,7 +43,7 @@ public class ConflationQueueList extends SimpleQueueEntryList private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); - public ConflationQueueList(AMQQueue queue, String conflationKey) + public ConflationQueueList(AMQQueue queue, String conflationKey) { super(queue); _conflationKey = conflationKey; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index 22cb6aeb7b..4c74e5ba0b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -241,17 +241,17 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return groupVal; } - private class GroupStateChangeListener implements StateChangeListener + private class GroupStateChangeListener implements StateChangeListener, QueueEntry.State> { private final Group _group; public GroupStateChangeListener(final Group group, - final MessageInstance entry) + final MessageInstance entry) { _group = group; } - public void stateChanged(final MessageInstance entry, + public void stateChanged(final MessageInstance entry, final MessageInstance.State oldState, final MessageInstance.State newState) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index 66315af9fb..05d84327d4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; public class PriorityQueueList implements QueueEntryList { - private final AMQQueue _queue; + private final AMQQueue _queue; private final PriorityQueueEntrySubList[] _priorityLists; private final int _priorities; private final int _priorityOffset; @@ -46,7 +46,7 @@ public class PriorityQueueList implements QueueEntryList return _priorities; } - public AMQQueue getQueue() + public AMQQueue getQueue() { return _queue; } @@ -166,7 +166,7 @@ public class PriorityQueueList implements QueueEntryList { private int _listPriority; - public PriorityQueueEntrySubList(AMQQueue queue, int listPriority) + public PriorityQueueEntrySubList(AMQQueue queue, int listPriority) { super(queue); _listPriority = listPriority; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index d8332f7306..f1b612a086 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -213,32 +213,27 @@ class QueueConsumer implements Consumer } } - @Override - public void flushBatched() + void flushBatched() { _target.flushBatched(); } - @Override - public void queueDeleted() + void queueDeleted() { _target.queueDeleted(); } - @Override - public boolean wouldSuspend(final MessageInstance msg) + boolean wouldSuspend(final MessageInstance msg) { return !_target.allocateCredit(msg.getMessage()); } - @Override - public void restoreCredit(final MessageInstance queueEntry) + void restoreCredit(final MessageInstance queueEntry) { _target.restoreCredit(queueEntry.getMessage()); } - @Override - public void queueEmpty() throws AMQException + void queueEmpty() throws AMQException { _target.queueEmpty(); } @@ -298,8 +293,7 @@ class QueueConsumer implements Consumer getQueue().flushConsumer(this); } - @Override - public boolean resend(final MessageInstance entry) throws AMQException + boolean resend(final MessageInstance entry) throws AMQException { return getQueue().resend((QueueEntry)entry, this); } @@ -430,7 +424,7 @@ class QueueConsumer implements Consumer return _createTime; } - public final MessageInstance.ConsumerAcquiredState getOwningState() + final MessageInstance.ConsumerAcquiredState getOwningState() { return _owningState; } @@ -465,10 +459,15 @@ class QueueConsumer implements Consumer return _deliveredCount.longValue(); } - public final void send(final MessageInstance entry, final boolean batch) throws AMQException + final void send(final QueueEntry entry, final boolean batch) throws AMQException { _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getMessage().getSize()); + ServerMessage message = entry.getMessage(); + if(message == null) + { + throw new AMQException("message was null!"); + } + _deliveredBytes.addAndGet(message.getSize()); _target.send(entry, batch); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index d984cf8ab4..6a42088c47 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -22,10 +22,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; -public interface QueueEntry extends MessageInstance, Comparable +public interface QueueEntry extends MessageInstance, Comparable { - AMQQueue getQueue(); + AMQQueue getQueue(); long getSize(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index d39f49a619..788a40bf85 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -63,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile Set> _stateChangeListeners; + private volatile Set, State>> _stateChangeListeners; private static final AtomicReferenceFieldUpdater @@ -138,7 +138,7 @@ public abstract class QueueEntryImpl implements QueueEntry return _entryId; } - public AMQQueue getQueue() + public AMQQueue getQueue() { return _queueEntryList.getQueue(); } @@ -202,7 +202,7 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } - public boolean acquire(Consumer sub) + public boolean acquire(QueueConsumer sub) { final boolean acquired = acquire(sub.getOwningState()); if(acquired) @@ -218,7 +218,7 @@ public abstract class QueueEntryImpl implements QueueEntry return (_state instanceof ConsumerAcquiredState); } - public boolean isAcquiredBy(Consumer consumer) + public boolean isAcquiredBy(QueueConsumer consumer) { EntryState state = _state; return state instanceof ConsumerAcquiredState @@ -264,12 +264,12 @@ public abstract class QueueEntryImpl implements QueueEntry return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED)); } - public Consumer getDeliveredConsumer() + public QueueConsumer getDeliveredConsumer() { EntryState state = _state; if (state instanceof ConsumerAcquiredState) { - return ((ConsumerAcquiredState) state).getConsumer(); + return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer(); } else { @@ -279,7 +279,7 @@ public abstract class QueueEntryImpl implements QueueEntry public void reject() { - Consumer consumer = getDeliveredConsumer(); + QueueConsumer consumer = getDeliveredConsumer(); if (consumer != null) { @@ -296,7 +296,7 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(Consumer consumer) + public boolean isRejectedBy(QueueConsumer consumer) { if (_rejectedBy != null) // We have consumers that rejected this message @@ -333,7 +333,7 @@ public abstract class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener l : _stateChangeListeners) + for(StateChangeListener, State> l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } @@ -364,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final Action action, ServerTransaction txn) + public int routeToAlternate(final Action> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); @@ -376,7 +376,10 @@ public abstract class QueueEntryImpl implements QueueEntry txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); } - int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action); + int enqueues = alternateExchange.send(getMessage(), + getInstanceProperties(), + txn, + action); txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action() { @@ -409,21 +412,21 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener, State> listener) { - Set> listeners = _stateChangeListeners; + Set, State>> listeners = _stateChangeListeners; if(listeners == null) { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet>()); + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet, State>>()); listeners = _stateChangeListeners; } listeners.add(listener); } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener, State> listener) { - Set> listeners = _stateChangeListeners; + Set, State>> listeners = _stateChangeListeners; if(listeners != null) { return listeners.remove(listener); @@ -491,7 +494,7 @@ public abstract class QueueEntryImpl implements QueueEntry @Override public boolean resend() throws AMQException { - Consumer sub = getDeliveredConsumer(); + QueueConsumer sub = getDeliveredConsumer(); if(sub != null) { return sub.resend(this); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 641aaa0a08..ad1f703f51 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList { - AMQQueue getQueue(); + AMQQueue getQueue(); Q add(ServerMessage message); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7435c690b3..9bf237660c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -56,7 +56,7 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class SimpleAMQQueue implements AMQQueue, +public class SimpleAMQQueue implements AMQQueue, StateChangeListener, MessageGroupManager.ConsumerResetHelper { @@ -525,9 +525,9 @@ public class SimpleAMQQueue implements AMQQueue, } - public Collection getConsumers() + public Collection getConsumers() { - List consumers = new ArrayList(); + List consumers = new ArrayList(); QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); while(iter.advance()) { @@ -636,7 +636,7 @@ public class SimpleAMQQueue implements AMQQueue, enqueue(message, null); } - public void enqueue(ServerMessage message, Action action) throws AMQException + public void enqueue(ServerMessage message, Action> action) throws AMQException { incrementQueueCount(); incrementQueueSize(message); @@ -1464,7 +1464,7 @@ public class SimpleAMQQueue implements AMQQueue, } - public void flushConsumer(Consumer sub) throws AMQException + void flushConsumer(QueueConsumer sub) throws AMQException { // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) @@ -1474,7 +1474,7 @@ public class SimpleAMQQueue implements AMQQueue, flushConsumer(sub, Long.MAX_VALUE); } - public boolean flushConsumer(Consumer sub, long iterations) throws AMQException + boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException { boolean atTail = false; final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; @@ -1968,7 +1968,7 @@ public class SimpleAMQQueue implements AMQQueue, return _notificationChecks; } - private final class QueueEntryListener implements StateChangeListener + private final class QueueEntryListener implements StateChangeListener, QueueEntry.State> { private final QueueConsumer _sub; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index b8d8ec19f4..101771c7cc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -39,7 +39,7 @@ public class SimpleQueueEntryList implements QueueEntryList _queue; static final AtomicReferenceFieldUpdater _nextUpdater = SimpleQueueEntryImpl._nextUpdater; @@ -49,7 +49,7 @@ public class SimpleQueueEntryList implements QueueEntryList _unscavengedHWM = new AtomicReference(); - public SimpleQueueEntryList(AMQQueue queue) + public SimpleQueueEntryList(AMQQueue queue) { _queue = queue; _head = new SimpleQueueEntryImpl(this); @@ -71,7 +71,7 @@ public class SimpleQueueEntryList implements QueueEntryList getQueue() { return _queue; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 3185abc6cd..1f1e50a058 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -50,7 +50,7 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public void enqueue(ServerMessage message, Action action) throws AMQException + public void enqueue(ServerMessage message, Action> action) throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 85559157a9..336ee566eb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; * Uses the red/black tree algorithm specified in "Introduction to Algorithms". * ISBN-10: 0262033844 * ISBN-13: 978-0262033848 - * @see http://en.wikipedia.org/wiki/Red-black_tree + * see http://en.wikipedia.org/wiki/Red-black_tree */ public class SortedQueueEntryList implements QueueEntryList { @@ -36,17 +36,17 @@ public class SortedQueueEntryList implements QueueEntryList _queue; private final String _propertyName; - public SortedQueueEntryList(final AMQQueue queue, final String propertyName) + public SortedQueueEntryList(final AMQQueue queue, final String propertyName) { _queue = queue; _head = new SortedQueueEntryImpl(this); _propertyName = propertyName; } - public AMQQueue getQueue() + public AMQQueue getQueue() { return _queue; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index a688b493e1..de7369f5ed 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; @@ -46,7 +47,7 @@ public class DurableConfigurationStoreHelper Queue.EXCLUSIVE, Queue.ALTERNATE_EXCHANGE)); - public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException + public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException { Map attributesMap = new LinkedHashMap(); attributesMap.put(Queue.NAME, queue.getName()); @@ -71,7 +72,7 @@ public class DurableConfigurationStoreHelper store.update(queue.getId(), QUEUE, attributesMap); } - public static void createQueue(DurableConfigurationStore store, AMQQueue queue) + public static void createQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException { Map attributesMap = new HashMap(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 764549626a..ea9d0ac693 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -25,6 +25,7 @@ import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -85,7 +86,7 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -108,7 +109,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -139,7 +140,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -190,7 +191,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -216,7 +217,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchAfterHash() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -255,7 +256,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); @@ -277,7 +278,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); @@ -321,7 +322,7 @@ public class TopicExchangeTest extends QpidTestCase when(message.getMessageNumber()).thenReturn(messageNumber); for(BaseQueue q : queues) { - q.enqueue(message); + q.enqueue(message, null); } return queues.size(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 7092b50c52..ced00dc578 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -51,19 +51,19 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest // Enqueue messages in order SimpleAMQQueue queue = getQueue(); - queue.enqueue(createMessage(1L, (byte) 10)); - queue.enqueue(createMessage(2L, (byte) 4)); - queue.enqueue(createMessage(3L, (byte) 0)); + queue.enqueue(createMessage(1L, (byte) 10), null); + queue.enqueue(createMessage(2L, (byte) 4), null); + queue.enqueue(createMessage(3L, (byte) 0), null); // Enqueue messages in reverse order - queue.enqueue(createMessage(4L, (byte) 0)); - queue.enqueue(createMessage(5L, (byte) 4)); - queue.enqueue(createMessage(6L, (byte) 10)); + queue.enqueue(createMessage(4L, (byte) 0), null); + queue.enqueue(createMessage(5L, (byte) 4), null); + queue.enqueue(createMessage(6L, (byte) 10), null); // Enqueue messages out of order - queue.enqueue(createMessage(7L, (byte) 4)); - queue.enqueue(createMessage(8L, (byte) 10)); - queue.enqueue(createMessage(9L, (byte) 0)); + queue.enqueue(createMessage(7L, (byte) 4), null); + queue.enqueue(createMessage(8L, (byte) 10), null); + queue.enqueue(createMessage(9L, (byte) 0), null); // Register subscriber queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 1158781beb..e9c9a0305f 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -43,7 +43,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; -public class MockAMQQueue implements AMQQueue +public class MockAMQQueue implements AMQQueue { private boolean _deleted = false; private String _name; @@ -208,7 +208,7 @@ public class MockAMQQueue implements AMQQueue } @Override - public Consumer addConsumer(final ConsumerTarget target, + public QueueConsumer addConsumer(final ConsumerTarget target, final FilterManager filters, final Class messageClass, final String consumerName, @@ -226,7 +226,7 @@ public class MockAMQQueue implements AMQQueue - public Collection getConsumers() + public Collection getConsumers() { return Collections.emptyList(); } @@ -306,7 +306,7 @@ public class MockAMQQueue implements AMQQueue { } - public void enqueue(ServerMessage message, Action action) throws AMQException + public void enqueue(ServerMessage message, Action> action) throws AMQException { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 95a6030d6a..7ab7a07ab6 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -42,7 +42,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public boolean acquire(Consumer sub) + public boolean acquire(QueueConsumer sub) { return false; } @@ -58,12 +58,12 @@ public class MockQueueEntry implements QueueEntry return false; } - public boolean isAcquiredBy(Consumer consumer) + public boolean isAcquiredBy(QueueConsumer consumer) { return false; } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener, State> listener) { } @@ -73,7 +73,7 @@ public class MockQueueEntry implements QueueEntry } - public int routeToAlternate(final Action action, final ServerTransaction txn) + public int routeToAlternate(final Action> action, final ServerTransaction txn) { return 0; } @@ -88,7 +88,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public Consumer getDeliveredConsumer() + public QueueConsumer getDeliveredConsumer() { return null; } @@ -103,7 +103,7 @@ public class MockQueueEntry implements QueueEntry return _message; } - public AMQQueue getQueue() + public AMQQueue getQueue() { return null; } @@ -126,7 +126,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean isRejectedBy(Consumer consumer) + public boolean isRejectedBy(QueueConsumer consumer) { return false; @@ -153,7 +153,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener, State> listener) { return false; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 0b2c6c900c..95139d8740 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -21,11 +21,11 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.protocol.AMQSessionModel; import java.lang.reflect.Field; @@ -113,16 +113,16 @@ public abstract class QueueEntryImplTestBase extends TestCase */ private void acquire() { - _queueEntry.acquire(newMockConsumer()); + _queueEntry.acquire(newConsumer()); assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", _queueEntry.isAcquired()); } - private Consumer newMockConsumer() + private QueueConsumer newConsumer() { - final Consumer consumer = mock(Consumer.class); - when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer)); - when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement()); + final ConsumerTarget target = mock(ConsumerTarget.class); + when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class)); + final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target); return consumer; } @@ -153,7 +153,7 @@ public abstract class QueueEntryImplTestBase extends TestCase */ public void testRejectAndRejectedBy() { - Consumer sub = newMockConsumer(); + QueueConsumer sub = newConsumer(); assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub)); assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired()); @@ -167,7 +167,7 @@ public abstract class QueueEntryImplTestBase extends TestCase assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub)); //repeat rejection using a second consumer - Consumer sub2 = newMockConsumer(); + QueueConsumer sub2 = newConsumer(); assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2)); assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 62120f26d3..f2b53f95c3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -168,14 +168,15 @@ public class SimpleAMQQueueTest extends QpidTestCase // Check adding a consumer adds it to the queue _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); try { Thread.sleep(2000L); @@ -194,7 +195,7 @@ public class SimpleAMQQueueTest extends QpidTestCase 1 == _queue.getActiveConsumerCount()); ServerMessage messageB = createMessage(new Long (25)); - _queue.enqueue(messageB); + _queue.enqueue(messageB, null); assertNull(_consumer.getQueueContext()); } @@ -202,9 +203,10 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException { ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", @@ -218,10 +220,11 @@ public class SimpleAMQQueueTest extends QpidTestCase { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); - _queue.enqueue(messageA); - _queue.enqueue(messageB); + _queue.enqueue(messageA, null); + _queue.enqueue(messageB, null); _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", @@ -245,13 +248,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry) entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue three messages */ @@ -298,13 +295,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.ACQUIRES)); final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry) entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue one message with expiration set for a short time in the future */ @@ -356,13 +347,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry) entry); - } - }; + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue three messages */ @@ -420,14 +405,7 @@ public class SimpleAMQQueueTest extends QpidTestCase final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() - { - public void performAction(MessageInstance entry) - { - queueEntries.add((QueueEntry)entry); - } - }; - + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); /* Enqueue two messages */ @@ -460,7 +438,8 @@ public class SimpleAMQQueueTest extends QpidTestCase // Check adding an exclusive consumer adds it to the queue _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE)); + EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -468,7 +447,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - _queue.enqueue(messageA); + _queue.enqueue(messageA, null); try { Thread.sleep(2000L); @@ -485,7 +464,8 @@ public class SimpleAMQQueueTest extends QpidTestCase { _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); } catch (AMQException e) @@ -498,7 +478,8 @@ public class SimpleAMQQueueTest extends QpidTestCase // existing consumer _consumer.close(); _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); try { @@ -522,9 +503,10 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(new Long(25)); _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); - _queue.enqueue(message); + _queue.enqueue(message, null); _consumer.close(); assertTrue("Queue was not deleted when consumer was removed", _queue.isDeleted()); @@ -536,12 +518,27 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(id); _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + + _queue.enqueue(message, new Action>() + { + @Override + public void performAction(final MessageInstance object) + { + QueueEntry entry = (QueueEntry) object; + entry.setRedelivered(); + try + { + _consumer.resend(entry); + } + catch (AMQException e) + { + fail("Exception thrown: " + e.getMessage()); + } + } + }); + - _queue.enqueue(message); - QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry(); - entry.setRedelivered(); - _consumer.resend(entry); } @@ -552,7 +549,7 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -568,7 +565,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get message ids List msgids = _queue.getMessagesOnTheQueue(5); @@ -589,7 +586,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get message ids List msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -610,7 +607,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message); + _queue.enqueue(message, null); } // Get non-existent 0th QueueEntry & check returned list was empty @@ -953,7 +950,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, entries.get(0).getMessage().getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -1018,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase } @Override - public boolean acquire(Consumer sub) + public boolean acquire(QueueConsumer sub) { if(message.getMessageNumber() % 2 == 0) { @@ -1044,7 +1042,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, createMessage(-1l).getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); } catch (AMQException e) { @@ -1077,7 +1076,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, createMessage(-1l).getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify adding an inactive consumer doesn't increase the count @@ -1089,7 +1089,8 @@ public class SimpleAMQQueueTest extends QpidTestCase null, createMessage(-1l).getClass(), "test", - EnumSet.noneOf(Consumer.Option.class)); + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: @@ -1133,10 +1134,10 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.setNotificationListener(listener); _queue.setMaximumMessageCount(2); - _queue.enqueue(createMessage(new Long(24))); + _queue.enqueue(createMessage(new Long(24)), null); verifyZeroInteractions(listener); - _queue.enqueue(createMessage(new Long(25))); + _queue.enqueue(createMessage(new Long(25)), null); verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); } @@ -1145,9 +1146,9 @@ public class SimpleAMQQueueTest extends QpidTestCase { AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); - _queue.enqueue(createMessage(new Long(24))); - _queue.enqueue(createMessage(new Long(25))); - _queue.enqueue(createMessage(new Long(26))); + _queue.enqueue(createMessage(new Long(24)), null); + _queue.enqueue(createMessage(new Long(25)), null); + _queue.enqueue(createMessage(new Long(26)), null); _queue.setNotificationListener(listener); _queue.setMaximumMessageCount(2); @@ -1309,6 +1310,21 @@ public class SimpleAMQQueueTest extends QpidTestCase return message; } + private static class EntryListAddingAction implements Action> + { + private final ArrayList _queueEntries; + + public EntryListAddingAction(final ArrayList queueEntries) + { + _queueEntries = queueEntries; + } + + public void performAction(MessageInstance entry) + { + _queueEntries.add((QueueEntry) entry); + } + } + class TestSimpleQueueEntryListFactory implements QueueEntryListFactory { QueueEntryList _list; diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index b5c4724292..53022c333e 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -105,18 +106,7 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final Action _checkCapacityAction = new Action() - { - @Override - public void performAction(final MessageInstance entry) - { - TransactionLogResource queue = entry.getOwningResource(); - if(queue instanceof CapacityChecker) - { - ((CapacityChecker)queue).checkCapacity(ServerSession.this); - } - } - }; + private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); public static interface MessageDispositionChangeListener { @@ -938,4 +928,16 @@ public class ServerSession extends Session return getId().compareTo(o.getId()); } + private class CheckCapacityAction implements Action> + { + @Override + public void performAction(final MessageInstance entry) + { + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(ServerSession.this); + } + } + } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index dc9a6484fa..c7029e83a2 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1192,14 +1192,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction implements Action + private class ImmediateAction implements Action> { public ImmediateAction() { } - public void performAction(MessageInstance entry) + public void performAction(MessageInstance entry) { TransactionLogResource queue = entry.getOwningResource(); @@ -1258,10 +1258,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - private final class CapacityCheckAction implements Action + private final class CapacityCheckAction implements Action> { @Override - public void performAction(final MessageInstance entry) + public void performAction(final MessageInstance entry) { TransactionLogResource queue = entry.getOwningResource(); if(queue instanceof CapacityChecker) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index e8c922b023..aa5a75396a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -62,7 +62,7 @@ public class ExtractResendAndRequeueTest extends TestCase private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue; - private LinkedList _referenceList = new LinkedList(); + private LinkedList _referenceList = new LinkedList(); private Consumer _consumer; private boolean _queueDeleted; @@ -115,11 +115,11 @@ public class ExtractResendAndRequeueTest extends TestCase * * @return Subscription that performed the acquire */ - private void acquireMessages(LinkedList messageList) + private void acquireMessages(LinkedList messageList) { // Acquire messages in subscription - for(QueueEntry entry : messageList) + for(MessageInstance entry : messageList) { when(entry.getDeliveredConsumer()).thenReturn(_consumer); } -- cgit v1.2.1