summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-26 15:59:24 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-26 15:59:24 +0000
commit4473eb5d9e9da37e515be063952d284034adbec3 (patch)
tree0464302b3b8797d8b9cb5bd26f2db47ae17fdbbb
parentc68e2bc905e7960761d48df3220fbb174223a0f7 (diff)
downloadqpid-python-4473eb5d9e9da37e515be063952d284034adbec3.tar.gz
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client.
This disables the JMS features that rely upon Qpid Java broker specific features. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@532786 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java66
1 files changed, 55 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 93724c9920..700222dabb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -202,11 +202,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ /** System property to enable strickt AMQP compliance */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
+ /** Strickt AMQP default */
+ public static final String STRICT_AMQP_DEFAULT = "false";
+
+ private final boolean _strictAMQP;
+
+
/** System property to enable immediate message prefetching */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
/** Immediate message prefetch default */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+ private final boolean _immediatePrefetch;
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
@@ -435,6 +444,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
+
+ _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+ _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+
_connection = con;
_transacted = transacted;
if (transacted)
@@ -921,15 +934,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_dispatcher.rollback();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false) // requeue
- , BasicRecoverOkBody.class);
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ false)); // requeue
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ }
+ else
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ false) // requeue
+ , BasicRecoverOkBody.class);
+ }
if (!isSuspended)
{
suspendChannel(false);
@@ -1433,7 +1458,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector) throws AMQException
{
- //fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
@@ -1709,11 +1733,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
checkNotClosed();
checkValidQueue(queue);
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
@@ -1762,6 +1796,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
@@ -1940,7 +1979,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
- if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
+ if (!_immediatePrefetch)
{
// We do this now if this is the first call on a started connection
if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
@@ -2005,7 +2044,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
- if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
+ if (!_immediatePrefetch)
{
// The dispatcher will be null if we have just created this session
// so suspend the channel before we register our consumer so that we don't
@@ -2392,4 +2431,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public boolean isStrictAMQP()
+ {
+ return _strictAMQP;
+ }
+
}