diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 5a411c6807..3c76252cb2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -43,6 +43,9 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TestNetworkConnection; @@ -120,6 +123,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } + public ClientDeliveryMethod createDeliveryMethod(int channelId) + { + return new InternalWriteDeliverMethod(channelId); + } + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) { } @@ -213,4 +221,41 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr ((AMQChannel)session).getProtocolSession().closeSession(); } + + private class InternalWriteDeliverMethod implements ClientDeliveryMethod + { + private int _channelId; + + public InternalWriteDeliverMethod(int channelId) + { + _channelId = channelId; + } + + + public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException + { + _deliveryCount.incrementAndGet(); + + synchronized (_channelDelivers) + { + Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); + + if (consumers == null) + { + consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + _channelDelivers.put(_channelId, consumers); + } + + LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag()); + + if (consumerDelivers == null) + { + consumerDelivers = new LinkedList<DeliveryPair>(); + consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers); + } + + consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + } + } + } } |