summaryrefslogtreecommitdiff
path: root/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java')
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java167
1 files changed, 54 insertions, 113 deletions
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index eec1edca35..2e52d47326 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -23,6 +23,10 @@ package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Port;
@@ -42,36 +46,34 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class MockSubscription implements Subscription
+public class MockSubscription implements SubscriptionTarget
{
+ private final List<String> _messageIds;
private boolean _closed = false;
private String tag = "mocktag";
private AMQQueue queue = null;
- private StateChangeListener<Subscription, State> _listener = null;
- private volatile AMQQueue.Context _queueContext = null;
+ private StateChangeListener<SubscriptionTarget, State> _listener = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
- private List<QueueEntry> _acceptEntries = null;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private static final AtomicLong idGenerator = new AtomicLong(0);
// Create a simple ID that increments for ever new Subscription
- private final long _subscriptionID = idGenerator.getAndIncrement();
private boolean _isActive = true;
+ private Subscription _subscription;
public MockSubscription()
{
+ _messageIds = null;
}
- public MockSubscription(List<QueueEntry> acceptEntries)
+ public MockSubscription(List<String> messageIds)
{
- _acceptEntries = acceptEntries;
+ _messageIds = messageIds;
}
- public void close()
+ public boolean close()
{
_closed = true;
if (_listener != null)
@@ -79,6 +81,7 @@ public class MockSubscription implements Subscription
_listener.stateChanged(this, _state, State.CLOSED);
}
_state = State.CLOSED;
+ return true;
}
public String getName()
@@ -86,45 +89,26 @@ public class MockSubscription implements Subscription
return tag;
}
- @Override
- public void flush() throws AMQException
- {
-
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public LogActor getLogActor()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public long getBytesOut()
- {
- return 0; // TODO - Implement
- }
-
- public long getMessagesOut()
+ public FilterManager getFilters()
{
- return 0; // TODO - Implement
+ if(_messageIds != null)
+ {
+ SimpleFilterManager filters = new SimpleFilterManager();
+ filters.add(new MessageFilter()
+ {
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ final String messageId = message.getMessageHeader().getMessageId();
+ return _messageIds.contains(messageId);
+ }
+ });
+ return filters;
+ }
+ else
+ {
+ return null;
+ }
}
public long getUnacknowledgedBytes()
@@ -147,62 +131,18 @@ public class MockSubscription implements Subscription
return new MockSessionModel();
}
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
- if(_acceptEntries != null)
- {
- //simulate selector behaviour, only signal
- //interest in the dictated queue entries
- return _acceptEntries.contains(entry);
- }
-
- return true;
- }
-
public boolean isActive()
{
return _isActive ;
}
- public void set(String key, Object value)
- {
- }
- public Object get(String key)
- {
- return null;
- }
-
- public boolean isAutoClose()
- {
- return false;
- }
public boolean isClosed()
{
return _closed;
}
- public boolean acquires()
- {
- return true;
- }
-
- public boolean seesRequeues()
- {
- return true;
- }
public boolean isSuspended()
{
@@ -213,11 +153,6 @@ public class MockSubscription implements Subscription
{
}
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
public void restoreCredit(QueueEntry queueEntry)
{
}
@@ -236,33 +171,37 @@ public class MockSubscription implements Subscription
}
- public void setQueueContext(AMQQueue.Context queueContext)
+ public State getState()
{
- _queueContext = queueContext;
+ return _state;
}
- public void setQueue(AMQQueue queue, boolean exclusive)
+ @Override
+ public void subscriptionRegistered(final Subscription sub)
{
- this.queue = queue;
+ _subscription = sub;
}
- public void setNoLocal(boolean noLocal)
+ @Override
+ public void subscriptionRemoved(final Subscription sub)
{
- }
- public void setStateListener(StateChangeListener<Subscription, State> listener)
- {
- this._listener = listener;
}
- public State getState()
+ public void setState(State state)
{
- return _state;
+ State oldState = _state;
+ _state = state;
+ if(_listener != null)
+ {
+ _listener.stateChanged(this, oldState, state);
+ }
}
- public boolean wouldSuspend(QueueEntry msg)
+ @Override
+ public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener)
{
- return false;
+ _listener = listener;
}
public ArrayList<QueueEntry> getMessages()
@@ -270,13 +209,15 @@ public class MockSubscription implements Subscription
return messages;
}
- public boolean isSessionTransactional()
+
+ public void queueEmpty() throws AMQException
{
- return false;
}
- public void queueEmpty() throws AMQException
+ @Override
+ public boolean allocateCredit(final QueueEntry msg)
{
+ return true;
}
public void setActive(final boolean isActive)