From c95df4c1fdccd685442599c1aaa780c8dd7424b9 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Sat, 16 Dec 2006 21:55:31 +0000 Subject: QPID-207 : Patch supplied by Rob Godfrey - fix implementation of acknowledge as per JMS spec git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487903 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 26 ++++++++++++++++++++ .../apache/qpid/client/BasicMessageConsumer.java | 11 ++++++--- .../qpid/client/message/AbstractJMSMessage.java | 12 ++++++++-- .../src/main/java/org/apache/qpid/jms/Message.java | 28 ++++++++++++++++++++++ 4 files changed, 72 insertions(+), 5 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/jms/Message.java (limited to 'java/client/src/main') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 097e6bec62..70a5a1efeb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -39,6 +39,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; + import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; @@ -49,6 +50,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -136,6 +138,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); + private final AtomicLong _lastDeliveryTag = new AtomicLong(); /** @@ -181,7 +184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { + consumer.notifyMessage(message, _channelId); + } } else @@ -697,6 +702,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + + public void acknowledge() throws JMSException + { + if (getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + if (isClosed()) + { + throw new javax.jms.IllegalStateException("Session is already closed"); + } + acknowledgeMessage(_lastDeliveryTag.get(), true); + + } + + void setLastDeliveredMessage(AbstractJMSMessage message) + { + _lastDeliveryTag.set(message.getDeliveryTag()); + } + + public MessageListener getMessageListener() throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01146844f0..9f9038fddd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -214,10 +214,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //handle case where connection has already been started, and the dispatcher is blocked //doing a put on the _synchronousQueue - Object msg = _synchronousQueue.poll(); - if (msg != null) + AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); + if (jmsMsg != null) { - AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg; + _session.setLastDeliveredMessage(jmsMsg); messageListener.onMessage(jmsMsg); postDeliver(jmsMsg); } @@ -297,8 +297,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { + _session.setLastDeliveredMessage(m); postDeliver(m); } + return m; } catch (InterruptedException e) @@ -324,8 +326,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { + _session.setLastDeliveredMessage(m); postDeliver(m); } + return m; } finally @@ -424,6 +428,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //we do not need a lock around the test above, and the dispatch below as it is invalid //for an application to alter an installed listener while the session is started + _session.setLastDeliveredMessage(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 8982d5d2e1..75e84fee96 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -43,7 +43,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.Map; -public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message +public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); @@ -384,7 +384,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName); } - public void acknowledge() throws JMSException + public void acknowledgeThis() throws JMSException { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. @@ -401,6 +401,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } } + public void acknowledge() throws JMSException + { + if(_session != null) + { + _session.acknowledge(); + } + } + /** * This forces concrete classes to implement clearBody() diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java new file mode 100644 index 0000000000..d73e51d755 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms; + +import javax.jms.*; + +public interface Message extends javax.jms.Message +{ + public void acknowledgeThis() throws JMSException; +} -- cgit v1.2.1