summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java')
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java518
1 files changed, 518 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
new file mode 100644
index 0000000000..a9b99503ec
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -0,0 +1,518 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.qpid.server.consumer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MockConsumer implements ConsumerTarget
+{
+
+ private final List<String> _messageIds;
+ private boolean _closed = false;
+ private String tag = "mocktag";
+ private AMQQueue queue = null;
+ private StateChangeListener<ConsumerTarget, State> _listener = null;
+ private State _state = State.ACTIVE;
+ private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>();
+ private final Lock _stateChangeLock = new ReentrantLock();
+
+ private boolean _isActive = true;
+
+ public MockConsumer()
+ {
+ _messageIds = null;
+ }
+
+ public MockConsumer(List<String> messageIds)
+ {
+ _messageIds = messageIds;
+ }
+
+ public boolean close()
+ {
+ _closed = true;
+ if (_listener != null)
+ {
+ _listener.stateChanged(this, _state, State.CLOSED);
+ }
+ _state = State.CLOSED;
+ return true;
+ }
+
+ public String getName()
+ {
+ return tag;
+ }
+
+ public FilterManager getFilters()
+ {
+ if(_messageIds != null)
+ {
+ SimpleFilterManager filters = new SimpleFilterManager();
+ filters.add(new MessageFilter()
+ {
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ final String messageId = message.getMessageHeader().getMessageId();
+ return _messageIds.contains(messageId);
+ }
+ });
+ return filters;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public AMQQueue getQueue()
+ {
+ return queue;
+ }
+
+ public AMQSessionModel getSessionModel()
+ {
+ return new MockSessionModel();
+ }
+
+ public boolean isActive()
+ {
+ return _isActive ;
+ }
+
+
+
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
+
+ public boolean isSuspended()
+ {
+ return false;
+ }
+
+ public void queueDeleted()
+ {
+ }
+
+ public void restoreCredit(ServerMessage message)
+ {
+ }
+
+ public void send(MessageInstance entry, boolean batch) throws AMQException
+ {
+ if (messages.contains(entry))
+ {
+ entry.setRedelivered();
+ }
+ messages.add(entry);
+ }
+
+ public void flushBatched()
+ {
+
+ }
+
+ public State getState()
+ {
+ return _state;
+ }
+
+ @Override
+ public void consumerAdded(final Consumer sub)
+ {
+ }
+
+ @Override
+ public void consumerRemoved(final Consumer sub)
+ {
+
+ }
+
+ public void setState(State state)
+ {
+ State oldState = _state;
+ _state = state;
+ if(_listener != null)
+ {
+ _listener.stateChanged(this, oldState, state);
+ }
+ }
+
+ @Override
+ public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _listener = listener;
+ }
+
+ public ArrayList<MessageInstance> getMessages()
+ {
+ return messages;
+ }
+
+
+ public void queueEmpty() throws AMQException
+ {
+ }
+
+ @Override
+ public boolean allocateCredit(final ServerMessage msg)
+ {
+ return true;
+ }
+
+ public void setActive(final boolean isActive)
+ {
+ _isActive = isActive;
+ }
+
+ private static class MockSessionModel implements AMQSessionModel
+ {
+ private final UUID _id = UUID.randomUUID();
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public AMQConnectionModel getConnectionModel()
+ {
+ return new MockConnectionModel();
+ }
+
+ @Override
+ public String getClientID()
+ {
+ return null;
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return null;
+ }
+
+ @Override
+ public void checkTransactionStatus(long openWarn, long openClose,
+ long idleWarn, long idleClose) throws AMQException
+ {
+ }
+
+ @Override
+ public void block(AMQQueue queue)
+ {
+ }
+
+ @Override
+ public void unblock(AMQQueue queue)
+ {
+ }
+
+ @Override
+ public void block()
+ {
+ }
+
+ @Override
+ public void unblock()
+ {
+ }
+
+ @Override
+ public boolean getBlocking()
+ {
+ return false;
+ }
+
+ @Override
+ public Object getConnectionReference()
+ {
+ return this;
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public Long getTxnCount()
+ {
+ return null;
+ }
+
+ @Override
+ public Long getTxnStart()
+ {
+ return null;
+ }
+
+ @Override
+ public Long getTxnCommits()
+ {
+ return null;
+ }
+
+ @Override
+ public Long getTxnRejects()
+ {
+ return null;
+ }
+
+ @Override
+ public int getChannelId()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getId().compareTo(o.getId());
+ }
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ }
+ }
+
+ private static class MockConnectionModel implements AMQConnectionModel
+ {
+ @Override
+ public void initialiseStatistics()
+ {
+ }
+
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+
+ }
+
+ @Override
+ public void close(AMQConstant cause, String message)
+ throws AMQException
+ {
+ }
+
+ @Override
+ public void closeSession(AMQSessionModel session, AMQConstant cause,
+ String message) throws AMQException
+ {
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return 0;
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ return null;
+ }
+
+ @Override
+ public void block()
+ {
+ }
+
+ @Override
+ public void unblock()
+ {
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return null;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return false;
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return "remoteAddress:1234";
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return null;
+ }
+
+ @Override
+ public String getClientVersion()
+ {
+ return null;
+ }
+
+ @Override
+ public String getClientProduct()
+ {
+ return null;
+ }
+
+ @Override
+ public String getPrincipalAsString()
+ {
+ return null;
+ }
+
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLastIoTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public Port getPort()
+ {
+ return null;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return null;
+ }
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return false;
+ }
+
+ @Override
+ public String getVirtualHostName()
+ {
+ return null;
+ }
+ }
+}