summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-06-16 21:04:01 +0000
committerRafael H. Schloming <rhs@apache.org>2008-06-16 21:04:01 +0000
commite9d18380aeda446a89c697971506c733a51f3287 (patch)
tree9dc7df3ab9b4af7abe3d8dd2e791aa030153cd76
parentab5ed1383ac95e1d994aaf742a0aeeeade69f0d9 (diff)
downloadqpid-python-e9d18380aeda446a89c697971506c733a51f3287.tar.gz
QPID-1139: use RFC1982 comparisons for rollback mark and update rollback mark to track dispatched messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@668311 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java12
4 files changed, 38 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index e741d4071c..61c06df7a5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -63,6 +63,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
}
catch (Exception e)
{
+ _logger.error("exception creating session:", e);
throw new JMSAMQException("cannot create session", e);
}
return session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 7f95edf60f..3b51fb8db0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2745,6 +2745,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
{
+ long deliveryTag = message.getDeliveryTag();
+
synchronized (_lock)
{
@@ -2753,27 +2755,24 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_lock.wait();
}
- if (message.getDeliveryTag() <= _rollbackMark.get())
+ if (tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
else
{
- if (message.getDeliveryTag() <= _rollbackMark.get())
- {
- rejectMessage(message, true);
- }
- else
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
- {
- dispatchMessage(message);
- }
+ dispatchMessage(message);
}
}
-
}
+ long current = _rollbackMark.get();
+ if (updateRollbackMark(current, deliveryTag))
+ {
+ _rollbackMark.compareAndSet(current, deliveryTag);
+ }
}
}
catch (InterruptedException e)
@@ -2851,6 +2850,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
+ abstract boolean tagLE(long tag1, long tag2);
+
+ abstract boolean updateRollbackMark(long current, long deliveryTag);
+
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
boolean read) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index fdfdc61eb5..4c3d768020 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.util.Serial;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -785,4 +786,15 @@ public class AMQSession_0_10 extends AMQSession
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
}
+
+ final boolean tagLE(long tag1, long tag2)
+ {
+ return Serial.le((int) tag1, (int) tag2);
+ }
+
+ final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return Serial.lt((int) currentMark, (int) deliveryTag);
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 740bd5dace..00aa8e4d31 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession
{
/** Used for debugging. */
@@ -453,4 +453,14 @@ public class AMQSession_0_8 extends AMQSession
return okHandler._messageCount;
}
+ final boolean tagLE(long tag1, long tag2)
+ {
+ return tag1 <= tag2;
+ }
+
+ final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return false;
+ }
+
}