diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-21 23:16:44 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-21 23:16:44 +0000 |
commit | 05fe12bb0ba1bcdb7e608567f1f8baeaf3f3431b (patch) | |
tree | 23527b4696112fe389ff1e1b47c6a7eca704fefc | |
parent | b59aa99fe6f813dcd2cffd1a793ecbe299664217 (diff) | |
download | qpid-python-05fe12bb0ba1bcdb7e608567f1f8baeaf3f3431b.tar.gz |
I added support in the JMS layer to figure out if it received any messages after calling flush
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@568321 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 33 insertions, 46 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java b/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java index 438f8f0605..d8be937e46 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -233,7 +233,7 @@ public interface Session * published them.
* <li>{@link Option#EXCLUSIVE}: <p> Request exclusive subscription access, meaning only this
* ubscription can access the queue.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
*
* @param queue The queue this receiver is receiving messages from.
@@ -423,16 +423,6 @@ public interface Session */
public void messageRelease(RangeSet ranges);
-
- /**
- * Returns the number of message received for this session since
- * {@link Session#messageFlow} has bee invoked.
- *
- * @return The number of message received for this session since
- * {@link Session#messageFlow} has bee invoked.
- */
- public int messagesReceived();
-
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
@@ -478,7 +468,7 @@ public interface Session * declaring connection closes.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
* This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
@@ -540,7 +530,7 @@ public interface Session * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
* If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* </p>
* <p/>
@@ -569,7 +559,7 @@ public interface Session * exchanges) are purged if/when a server restarts.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the exchange.
* The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option</p>
*
@@ -596,7 +586,7 @@ public interface Session * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
* exchange has queue bindings the server does not delete it but raises a channel exception
* instead.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index 6c2d4d477f..9793d3ad8b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -22,19 +22,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa private ExceptionListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; - - - public int messagesReceived() - { - // TODO - return 1; - } - - @Override public void sessionClose() - { - super.sessionClose(); - } - + public void messageAcknowledge(RangeSet ranges) { for (Range range : ranges) @@ -97,6 +85,10 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa public void setMessageListener(String destination, MessagePartListener listener) { + if (listener == null) + { + throw new IllegalArgumentException("Cannot set message listener to null"); + } _messageListeners.put(destination, listener); } @@ -105,8 +97,6 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa _exceptionListner = exceptionListner; } - // ugly but nessacery - void setAccquiredMessages(RangeSet acquiredMessages) { _acquiredMessages = acquiredMessages; diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index dc72f1f975..17646b631e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -9,7 +9,6 @@ import org.apache.qpidity.MessageReject; import org.apache.qpidity.MessageTransfer; import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; import org.apache.qpidity.Session; import org.apache.qpidity.SessionClosed; import org.apache.qpidity.SessionDelegate; @@ -54,17 +53,8 @@ public class ClientSessionDelegate extends SessionDelegate _currentTransfer = currentTransfer; _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); _currentMessageListener.messageTransfer(currentTransfer.getId()); - - //a better way is to tell the broker to stop the transfer - if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) - { - RangeSet transfers = new RangeSet(); - transfers.add(_currentTransfer.getId()); - session.messageRelease(transfers); - } } - @Override public void messageReject(Session session, MessageReject struct) { for (Range range : struct.getTransfers()) diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index e726288f89..8bab833ddf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -17,6 +17,8 @@ */ package org.apache.qpidity.jms; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; @@ -94,6 +96,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * Nether exceed MAX_MESSAGE_TRANSFERRED */ private int _messageAsyncrhonouslyReceived = 0; + + private AtomicBoolean _messageReceived = new AtomicBoolean(); //----- Constructors /** @@ -354,7 +358,6 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // This indicate to the delivery thread to deliver the message to this consumer // as it can happens that a message is delivered after a receive operation as returned. _isReceiving = true; - int received = 0; if (!_isStopped) { // if this consumer is stopped then this will be call when starting @@ -362,10 +365,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); + _messageReceived.set(false); + + //When sync() returns we know whether we have received a message or not. getSession().getQpidSession().sync(); - received = getSession().getQpidSession().messagesReceived(); + //received = getSession().getQpidSession().messagesReceived(); } - if (received == 0 && timeout < 0) + if (_messageReceived.get() && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return result = null; @@ -501,9 +507,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); - getSession().getQpidSession().sync(); - int received = getSession().getQpidSession().messagesReceived(); - if (received == 0 && _isNoWaitIsReceiving) + _messageReceived.set(false); + + // When sync() returns we know whether we have received a message or not. + getSession().getQpidSession().sync(); + + if (_messageReceived.get() && _isNoWaitIsReceiving) { // Right a message nowait is waiting for a message // but no one can be delivered it then need to return @@ -632,4 +641,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().testQpidException(); } } + + public void notifyMessageReceived() + { + _messageReceived.set(true); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java index ada5d048e1..4dbf86a388 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -62,6 +62,9 @@ public class QpidMessageListener implements MessageListener { try { + // to be used with flush + _consumer.notifyMessageReceived(); + //convert this message into a JMS one QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); // if consumer is asynchronous then send this message to its session. |