summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java77
1 files changed, 70 insertions, 7 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 96df463481..7daebbff04 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.Map;
@@ -80,7 +82,7 @@ import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -96,8 +98,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
@@ -150,7 +152,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- getProtocolHandler().writeFrame(ackFrame);
+ getProtocolHandler().writeFrame(ackFrame, !isTransacted());
_unacknowledgedMessageTags.remove(deliveryTag);
}
@@ -512,7 +514,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -572,6 +574,16 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}, _connection).execute();
}
+ public DestinationCache<AMQQueue> getQueueDestinationCache()
+ {
+ return _queueDestinationCache;
+ }
+
+ public DestinationCache<AMQTopic> getTopicDestinationCache()
+ {
+ return _topicDestinationCache;
+ }
+
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
@@ -613,12 +625,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return okHandler._messageCount;
}
- protected final boolean tagLE(long tag1, long tag2)
+ protected boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
- protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
@@ -695,4 +707,55 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return null;
}
}
+
+ public abstract static class DestinationCache<T extends AMQDestination>
+ {
+ private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
+
+ public T getDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ Map<AMQShortString, T> routingMap = cache.get(exchangeName);
+ if(routingMap == null)
+ {
+ routingMap = new LinkedHashMap<AMQShortString, T>()
+ {
+
+ protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest)
+ {
+ return size() >= 200;
+ }
+ };
+ cache.put(exchangeName,routingMap);
+ }
+ T destination = routingMap.get(routingKey);
+ if(destination == null)
+ {
+ destination = newDestination(exchangeName, routingKey);
+ routingMap.put(routingKey,destination);
+ }
+ return destination;
+ }
+
+ protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey);
+ }
+
+ private static class TopicDestinationCache extends DestinationCache<AMQTopic>
+ {
+ protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ return new AMQTopic(exchangeName, routingKey, null);
+ }
+ }
+
+ private static class QueueDestinationCache extends DestinationCache<AMQQueue>
+ {
+ protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ return new AMQQueue(exchangeName, routingKey, routingKey);
+ }
+ }
+
+ private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
+ private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();
+
}