diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 66 |
1 files changed, 55 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 93724c9920..700222dabb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -202,11 +202,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + /** System property to enable strickt AMQP compliance */ + public static final String STRICT_AMQP = "STRICT_AMQP"; + /** Strickt AMQP default */ + public static final String STRICT_AMQP_DEFAULT = "false"; + + private final boolean _strictAMQP; + + /** System property to enable immediate message prefetching */ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; /** Immediate message prefetch default */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + private final boolean _immediatePrefetch; private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); @@ -435,6 +444,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { + + _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); + _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + _connection = con; _transacted = transacted; if (transacted) @@ -921,15 +934,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _dispatcher.rollback(); } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false) // requeue - , BasicRecoverOkBody.class); + if (isStrictAMQP()) + { + // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. + _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + false)); // requeue + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + } + else + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + false) // requeue + , BasicRecoverOkBody.class); + } if (!isSuspended) { suspendChannel(false); @@ -1433,7 +1458,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException { - //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); @@ -1709,11 +1733,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { + if (isStrictAMQP()) + { + throw new UnsupportedOperationException(); + } + return createBrowser(queue, null); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + if (isStrictAMQP()) + { + throw new UnsupportedOperationException(); + } + checkNotClosed(); checkValidQueue(queue); return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); @@ -1762,6 +1796,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { + if (isStrictAMQP()) + { + throw new UnsupportedOperationException(); + } + // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) @@ -1940,7 +1979,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { // If IMMEDIATE_PREFETCH is not set then we need to start fetching - if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT))) + if (!_immediatePrefetch) { // We do this now if this is the first call on a started connection if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) @@ -2005,7 +2044,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch - if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT))) + if (!_immediatePrefetch) { // The dispatcher will be null if we have just created this session // so suspend the channel before we register our consumer so that we don't @@ -2392,4 +2431,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public boolean isStrictAMQP() + { + return _strictAMQP; + } + } |