diff options
4 files changed, 38 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e741d4071c..61c06df7a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -63,6 +63,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed } catch (Exception e) { + _logger.error("exception creating session:", e); throw new JMSAMQException("cannot create session", e); } return session; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7f95edf60f..3b51fb8db0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2745,6 +2745,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) { + long deliveryTag = message.getDeliveryTag(); + synchronized (_lock) { @@ -2753,27 +2755,24 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (message.getDeliveryTag() <= _rollbackMark.get()) + if (tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); } else { - if (message.getDeliveryTag() <= _rollbackMark.get()) - { - rejectMessage(message, true); - } - else + synchronized (_messageDeliveryLock) { - synchronized (_messageDeliveryLock) - { - dispatchMessage(message); - } + dispatchMessage(message); } } - } + long current = _rollbackMark.get(); + if (updateRollbackMark(current, deliveryTag)) + { + _rollbackMark.compareAndSet(current, deliveryTag); + } } } catch (InterruptedException e) @@ -2851,6 +2850,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + abstract boolean tagLE(long tag1, long tag2); + + abstract boolean updateRollbackMark(long current, long deliveryTag); + /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index fdfdc61eb5..4c3d768020 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.util.Serial; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -785,4 +786,15 @@ public class AMQSession_0_10 extends AMQSession throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } } + + final boolean tagLE(long tag1, long tag2) + { + return Serial.le((int) tag1, (int) tag2); + } + + final boolean updateRollbackMark(long currentMark, long deliveryTag) + { + return Serial.lt((int) currentMark, (int) deliveryTag); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 740bd5dace..00aa8e4d31 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQSession_0_8 extends AMQSession +public final class AMQSession_0_8 extends AMQSession { /** Used for debugging. */ @@ -453,4 +453,14 @@ public class AMQSession_0_8 extends AMQSession return okHandler._messageCount; } + final boolean tagLE(long tag1, long tag2) + { + return tag1 <= tag2; + } + + final boolean updateRollbackMark(long currentMark, long deliveryTag) + { + return false; + } + } |