diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 23 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java | 8 |
2 files changed, 21 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index ade7ab8033..ccae5e31e5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -64,6 +64,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack"; + + private final boolean _syncAfterClientAck = + Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true")); + /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked @@ -120,8 +125,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return getProtocolHandler().getProtocolVersion(); } - protected void acknowledgeImpl() + protected void acknowledgeImpl() throws JMSException { + boolean syncRequired = false; while (true) { Long tag = getUnacknowledgedMessageTags().poll(); @@ -131,6 +137,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } acknowledgeMessage(tag, false); + syncRequired = true; + } + + try + { + if (syncRequired && _syncAfterClientAck) + { + sync(); + } + } + catch (AMQException a) + { + throw new JMSAMQException("Failed to sync after acknowledge", a); } } @@ -681,7 +700,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe boolean noLocal, boolean noWait) throws AMQException { - throw new UnsupportedOperationException("The new addressing based sytanx is " + throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 751066abbc..4ad9069ba0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -189,14 +189,6 @@ public class TestAMQSession extends AMQSession_0_8 { } - public void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException - { - throw new UnsupportedOperationException("The new addressing based sytanx is " - + "not supported for AMQP 0-8/0-9 versions"); - } - @Override protected void flushAcknowledgments() { |