diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-16 21:55:31 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-16 21:55:31 +0000 |
| commit | c95df4c1fdccd685442599c1aaa780c8dd7424b9 (patch) | |
| tree | 81ed196b5c01a28ac07e684657c8b63218fb9fba /java/client/src/main | |
| parent | bcf477d8c2a3e30e5ec109d7af5861d7f344eff1 (diff) | |
| download | qpid-python-c95df4c1fdccd685442599c1aaa780c8dd7424b9.tar.gz | |
QPID-207 : Patch supplied by Rob Godfrey - fix implementation of acknowledge as per JMS spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
4 files changed, 72 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 097e6bec62..70a5a1efeb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -39,6 +39,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; + import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; @@ -49,6 +50,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -136,6 +138,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); + private final AtomicLong _lastDeliveryTag = new AtomicLong(); /** @@ -181,7 +184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { + consumer.notifyMessage(message, _channelId); + } } else @@ -697,6 +702,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + + public void acknowledge() throws JMSException + { + if (getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + if (isClosed()) + { + throw new javax.jms.IllegalStateException("Session is already closed"); + } + acknowledgeMessage(_lastDeliveryTag.get(), true); + + } + + void setLastDeliveredMessage(AbstractJMSMessage message) + { + _lastDeliveryTag.set(message.getDeliveryTag()); + } + + public MessageListener getMessageListener() throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01146844f0..9f9038fddd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -214,10 +214,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //handle case where connection has already been started, and the dispatcher is blocked //doing a put on the _synchronousQueue - Object msg = _synchronousQueue.poll(); - if (msg != null) + AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); + if (jmsMsg != null) { - AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg; + _session.setLastDeliveredMessage(jmsMsg); messageListener.onMessage(jmsMsg); postDeliver(jmsMsg); } @@ -297,8 +297,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { + _session.setLastDeliveredMessage(m); postDeliver(m); } + return m; } catch (InterruptedException e) @@ -324,8 +326,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { + _session.setLastDeliveredMessage(m); postDeliver(m); } + return m; } finally @@ -424,6 +428,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //we do not need a lock around the test above, and the dispatch below as it is invalid //for an application to alter an installed listener while the session is started + _session.setLastDeliveredMessage(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 8982d5d2e1..75e84fee96 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -43,7 +43,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.Map; -public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message +public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); @@ -384,7 +384,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName); } - public void acknowledge() throws JMSException + public void acknowledgeThis() throws JMSException { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. @@ -401,6 +401,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } } + public void acknowledge() throws JMSException + { + if(_session != null) + { + _session.acknowledge(); + } + } + /** * This forces concrete classes to implement clearBody() diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java new file mode 100644 index 0000000000..d73e51d755 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java @@ -0,0 +1,28 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+public interface Message extends javax.jms.Message
+{
+ public void acknowledgeThis() throws JMSException;
+}
|
