diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-06-16 21:04:01 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-06-16 21:04:01 +0000 |
commit | e9d18380aeda446a89c697971506c733a51f3287 (patch) | |
tree | 9dc7df3ab9b4af7abe3d8dd2e791aa030153cd76 | |
parent | ab5ed1383ac95e1d994aaf742a0aeeeade69f0d9 (diff) | |
download | qpid-python-e9d18380aeda446a89c697971506c733a51f3287.tar.gz |
QPID-1139: use RFC1982 comparisons for rollback mark and update rollback mark to track dispatched messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@668311 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 38 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e741d4071c..61c06df7a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -63,6 +63,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed } catch (Exception e) { + _logger.error("exception creating session:", e); throw new JMSAMQException("cannot create session", e); } return session; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7f95edf60f..3b51fb8db0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2745,6 +2745,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) { + long deliveryTag = message.getDeliveryTag(); + synchronized (_lock) { @@ -2753,27 +2755,24 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (message.getDeliveryTag() <= _rollbackMark.get()) + if (tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); } else { - if (message.getDeliveryTag() <= _rollbackMark.get()) - { - rejectMessage(message, true); - } - else + synchronized (_messageDeliveryLock) { - synchronized (_messageDeliveryLock) - { - dispatchMessage(message); - } + dispatchMessage(message); } } - } + long current = _rollbackMark.get(); + if (updateRollbackMark(current, deliveryTag)) + { + _rollbackMark.compareAndSet(current, deliveryTag); + } } } catch (InterruptedException e) @@ -2851,6 +2850,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + abstract boolean tagLE(long tag1, long tag2); + + abstract boolean updateRollbackMark(long current, long deliveryTag); + /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index fdfdc61eb5..4c3d768020 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.util.Serial; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -785,4 +786,15 @@ public class AMQSession_0_10 extends AMQSession throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } } + + final boolean tagLE(long tag1, long tag2) + { + return Serial.le((int) tag1, (int) tag2); + } + + final boolean updateRollbackMark(long currentMark, long deliveryTag) + { + return Serial.lt((int) currentMark, (int) deliveryTag); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 740bd5dace..00aa8e4d31 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQSession_0_8 extends AMQSession +public final class AMQSession_0_8 extends AMQSession { /** Used for debugging. */ @@ -453,4 +453,14 @@ public class AMQSession_0_8 extends AMQSession return okHandler._messageCount; } + final boolean tagLE(long tag1, long tag2) + { + return tag1 <= tag2; + } + + final boolean updateRollbackMark(long currentMark, long deliveryTag) + { + return false; + } + } |