diff options
Diffstat (limited to 'java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java')
-rw-r--r-- | java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java | 167 |
1 files changed, 54 insertions, 113 deletions
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index eec1edca35..2e52d47326 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Port; @@ -42,36 +46,34 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class MockSubscription implements Subscription +public class MockSubscription implements SubscriptionTarget { + private final List<String> _messageIds; private boolean _closed = false; private String tag = "mocktag"; private AMQQueue queue = null; - private StateChangeListener<Subscription, State> _listener = null; - private volatile AMQQueue.Context _queueContext = null; + private StateChangeListener<SubscriptionTarget, State> _listener = null; private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); - private List<QueueEntry> _acceptEntries = null; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription - private final long _subscriptionID = idGenerator.getAndIncrement(); private boolean _isActive = true; + private Subscription _subscription; public MockSubscription() { + _messageIds = null; } - public MockSubscription(List<QueueEntry> acceptEntries) + public MockSubscription(List<String> messageIds) { - _acceptEntries = acceptEntries; + _messageIds = messageIds; } - public void close() + public boolean close() { _closed = true; if (_listener != null) @@ -79,6 +81,7 @@ public class MockSubscription implements Subscription _listener.stateChanged(this, _state, State.CLOSED); } _state = State.CLOSED; + return true; } public String getName() @@ -86,45 +89,26 @@ public class MockSubscription implements Subscription return tag; } - @Override - public void flush() throws AMQException - { - - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public LogActor getLogActor() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isTransient() - { - return false; - } - - public long getBytesOut() - { - return 0; // TODO - Implement - } - - public long getMessagesOut() + public FilterManager getFilters() { - return 0; // TODO - Implement + if(_messageIds != null) + { + SimpleFilterManager filters = new SimpleFilterManager(); + filters.add(new MessageFilter() + { + @Override + public boolean matches(final Filterable message) + { + final String messageId = message.getMessageHeader().getMessageId(); + return _messageIds.contains(messageId); + } + }); + return filters; + } + else + { + return null; + } } public long getUnacknowledgedBytes() @@ -147,62 +131,18 @@ public class MockSubscription implements Subscription return new MockSessionModel(); } - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public boolean hasInterest(QueueEntry entry) - { - if(_acceptEntries != null) - { - //simulate selector behaviour, only signal - //interest in the dictated queue entries - return _acceptEntries.contains(entry); - } - - return true; - } - public boolean isActive() { return _isActive ; } - public void set(String key, Object value) - { - } - public Object get(String key) - { - return null; - } - - public boolean isAutoClose() - { - return false; - } public boolean isClosed() { return _closed; } - public boolean acquires() - { - return true; - } - - public boolean seesRequeues() - { - return true; - } public boolean isSuspended() { @@ -213,11 +153,6 @@ public class MockSubscription implements Subscription { } - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - public void restoreCredit(QueueEntry queueEntry) { } @@ -236,33 +171,37 @@ public class MockSubscription implements Subscription } - public void setQueueContext(AMQQueue.Context queueContext) + public State getState() { - _queueContext = queueContext; + return _state; } - public void setQueue(AMQQueue queue, boolean exclusive) + @Override + public void subscriptionRegistered(final Subscription sub) { - this.queue = queue; + _subscription = sub; } - public void setNoLocal(boolean noLocal) + @Override + public void subscriptionRemoved(final Subscription sub) { - } - public void setStateListener(StateChangeListener<Subscription, State> listener) - { - this._listener = listener; } - public State getState() + public void setState(State state) { - return _state; + State oldState = _state; + _state = state; + if(_listener != null) + { + _listener.stateChanged(this, oldState, state); + } } - public boolean wouldSuspend(QueueEntry msg) + @Override + public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener) { - return false; + _listener = listener; } public ArrayList<QueueEntry> getMessages() @@ -270,13 +209,15 @@ public class MockSubscription implements Subscription return messages; } - public boolean isSessionTransactional() + + public void queueEmpty() throws AMQException { - return false; } - public void queueEmpty() throws AMQException + @Override + public boolean allocateCredit(final QueueEntry msg) { + return true; } public void setActive(final boolean isActive) |