summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-16 21:55:31 +0000
committerRobert Greig <rgreig@apache.org>2006-12-16 21:55:31 +0000
commitc95df4c1fdccd685442599c1aaa780c8dd7424b9 (patch)
tree81ed196b5c01a28ac07e684657c8b63218fb9fba /java/client/src/main
parentbcf477d8c2a3e30e5ec109d7af5861d7f344eff1 (diff)
downloadqpid-python-c95df4c1fdccd685442599c1aaa780c8dd7424b9.tar.gz
QPID-207 : Patch supplied by Rob Godfrey - fix implementation of acknowledge as per JMS spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/Message.java28
4 files changed, 72 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 097e6bec62..70a5a1efeb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -39,6 +39,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
@@ -49,6 +50,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -136,6 +138,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
+ private final AtomicLong _lastDeliveryTag = new AtomicLong();
/**
@@ -181,7 +184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
+
consumer.notifyMessage(message, _channelId);
+
}
}
else
@@ -697,6 +702,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
}
+
+ public void acknowledge() throws JMSException
+ {
+ if (getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+ if (isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Session is already closed");
+ }
+ acknowledgeMessage(_lastDeliveryTag.get(), true);
+
+ }
+
+ void setLastDeliveredMessage(AbstractJMSMessage message)
+ {
+ _lastDeliveryTag.set(message.getDeliveryTag());
+ }
+
+
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 01146844f0..9f9038fddd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -214,10 +214,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//handle case where connection has already been started, and the dispatcher is blocked
//doing a put on the _synchronousQueue
- Object msg = _synchronousQueue.poll();
- if (msg != null)
+ AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+ if (jmsMsg != null)
{
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+ _session.setLastDeliveredMessage(jmsMsg);
messageListener.onMessage(jmsMsg);
postDeliver(jmsMsg);
}
@@ -297,8 +297,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
catch (InterruptedException e)
@@ -324,8 +326,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
finally
@@ -424,6 +428,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
//for an application to alter an installed listener while the session is started
+ _session.setLastDeliveredMessage(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 8982d5d2e1..75e84fee96 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -43,7 +43,7 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
-public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
+public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
@@ -384,7 +384,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
}
- public void acknowledge() throws JMSException
+ public void acknowledgeThis() throws JMSException
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
@@ -401,6 +401,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
}
+ public void acknowledge() throws JMSException
+ {
+ if(_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
/**
* This forces concrete classes to implement clearBody()
diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java
new file mode 100644
index 0000000000..d73e51d755
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+public interface Message extends javax.jms.Message
+{
+ public void acknowledgeThis() throws JMSException;
+}