diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 77 |
1 files changed, 70 insertions, 7 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 96df463481..7daebbff04 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,6 +21,8 @@ package org.apache.qpid.client; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.ArrayList; import java.util.Map; @@ -80,7 +82,7 @@ import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> +public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -96,8 +98,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ - AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark); @@ -150,7 +152,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - getProtocolHandler().writeFrame(ackFrame); + getProtocolHandler().writeFrame(ackFrame, !isTransacted()); _unacknowledgedMessageTags.remove(deliveryTag); } @@ -512,7 +514,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache); AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); @@ -572,6 +574,16 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B }, _connection).execute(); } + public DestinationCache<AMQQueue> getQueueDestinationCache() + { + return _queueDestinationCache; + } + + public DestinationCache<AMQTopic> getTopicDestinationCache() + { + return _topicDestinationCache; + } + class QueueDeclareOkHandler extends SpecificMethodFrameListener { @@ -613,12 +625,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return okHandler._messageCount; } - protected final boolean tagLE(long tag1, long tag2) + protected boolean tagLE(long tag1, long tag2) { return tag1 <= tag2; } - protected final boolean updateRollbackMark(long currentMark, long deliveryTag) + protected boolean updateRollbackMark(long currentMark, long deliveryTag) { return false; } @@ -695,4 +707,55 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return null; } } + + public abstract static class DestinationCache<T extends AMQDestination> + { + private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>(); + + public T getDestination(AMQShortString exchangeName, AMQShortString routingKey) + { + Map<AMQShortString, T> routingMap = cache.get(exchangeName); + if(routingMap == null) + { + routingMap = new LinkedHashMap<AMQShortString, T>() + { + + protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest) + { + return size() >= 200; + } + }; + cache.put(exchangeName,routingMap); + } + T destination = routingMap.get(routingKey); + if(destination == null) + { + destination = newDestination(exchangeName, routingKey); + routingMap.put(routingKey,destination); + } + return destination; + } + + protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey); + } + + private static class TopicDestinationCache extends DestinationCache<AMQTopic> + { + protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey) + { + return new AMQTopic(exchangeName, routingKey, null); + } + } + + private static class QueueDestinationCache extends DestinationCache<AMQQueue> + { + protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey) + { + return new AMQQueue(exchangeName, routingKey, routingKey); + } + } + + private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache(); + private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache(); + } |