summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java45
1 files changed, 45 insertions, 0 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 5a411c6807..3c76252cb2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -43,6 +43,9 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -120,6 +123,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
}
+ public ClientDeliveryMethod createDeliveryMethod(int channelId)
+ {
+ return new InternalWriteDeliverMethod(channelId);
+ }
+
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
}
@@ -213,4 +221,41 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
((AMQChannel)session).getProtocolSession().closeSession();
}
+
+ private class InternalWriteDeliverMethod implements ClientDeliveryMethod
+ {
+ private int _channelId;
+
+ public InternalWriteDeliverMethod(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+
+ public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ _channelDelivers.put(_channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ }
+ }
+ }
}