summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-21 23:16:44 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-21 23:16:44 +0000
commit05fe12bb0ba1bcdb7e608567f1f8baeaf3f3431b (patch)
tree23527b4696112fe389ff1e1b47c6a7eca704fefc
parentb59aa99fe6f813dcd2cffd1a793ecbe299664217 (diff)
downloadqpid-python-05fe12bb0ba1bcdb7e608567f1f8baeaf3f3431b.tar.gz
I added support in the JMS layer to figure out if it received any messages after calling flush
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@568321 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java3
5 files changed, 33 insertions, 46 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java b/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 438f8f0605..d8be937e46 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -233,7 +233,7 @@ public interface Session
* published them.
* <li>{@link Option#EXCLUSIVE}: <p> Request exclusive subscription access, meaning only this
* ubscription can access the queue.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
*
* @param queue The queue this receiver is receiving messages from.
@@ -423,16 +423,6 @@ public interface Session
*/
public void messageRelease(RangeSet ranges);
-
- /**
- * Returns the number of message received for this session since
- * {@link Session#messageFlow} has bee invoked.
- *
- * @return The number of message received for this session since
- * {@link Session#messageFlow} has bee invoked.
- */
- public int messagesReceived();
-
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
@@ -478,7 +468,7 @@ public interface Session
* declaring connection closes.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
* This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
@@ -540,7 +530,7 @@ public interface Session
* <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
* If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* </p>
* <p/>
@@ -569,7 +559,7 @@ public interface Session
* exchanges) are purged if/when a server restarts.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the exchange.
* The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option</p>
*
@@ -596,7 +586,7 @@ public interface Session
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
* exchange has queue bindings the server does not delete it but raises a channel exception
* instead.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index 6c2d4d477f..9793d3ad8b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -22,19 +22,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
private ExceptionListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
-
- public int messagesReceived()
- {
- // TODO
- return 1;
- }
-
- @Override public void sessionClose()
- {
- super.sessionClose();
- }
-
+
public void messageAcknowledge(RangeSet ranges)
{
for (Range range : ranges)
@@ -97,6 +85,10 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
public void setMessageListener(String destination, MessagePartListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("Cannot set message listener to null");
+ }
_messageListeners.put(destination, listener);
}
@@ -105,8 +97,6 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
_exceptionListner = exceptionListner;
}
- // ugly but nessacery
-
void setAccquiredMessages(RangeSet acquiredMessages)
{
_acquiredMessages = acquiredMessages;
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
index dc72f1f975..17646b631e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
@@ -9,7 +9,6 @@ import org.apache.qpidity.MessageReject;
import org.apache.qpidity.MessageTransfer;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
import org.apache.qpidity.Session;
import org.apache.qpidity.SessionClosed;
import org.apache.qpidity.SessionDelegate;
@@ -54,17 +53,8 @@ public class ClientSessionDelegate extends SessionDelegate
_currentTransfer = currentTransfer;
_currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
_currentMessageListener.messageTransfer(currentTransfer.getId());
-
- //a better way is to tell the broker to stop the transfer
- if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
- {
- RangeSet transfers = new RangeSet();
- transfers.add(_currentTransfer.getId());
- session.messageRelease(transfers);
- }
}
-
@Override public void messageReject(Session session, MessageReject struct)
{
for (Range range : struct.getTransfers())
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index e726288f89..8bab833ddf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -17,6 +17,8 @@
*/
package org.apache.qpidity.jms;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.qpidity.jms.message.QpidMessage;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
@@ -94,6 +96,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
+
+ private AtomicBoolean _messageReceived = new AtomicBoolean();
//----- Constructors
/**
@@ -354,7 +358,6 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// This indicate to the delivery thread to deliver the message to this consumer
// as it can happens that a message is delivered after a receive operation as returned.
_isReceiving = true;
- int received = 0;
if (!_isStopped)
{
// if this consumer is stopped then this will be call when starting
@@ -362,10 +365,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
.messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
1);
getSession().getQpidSession().messageFlush(getMessageActorID());
+ _messageReceived.set(false);
+
+ //When sync() returns we know whether we have received a message or not.
getSession().getQpidSession().sync();
- received = getSession().getQpidSession().messagesReceived();
+ //received = getSession().getQpidSession().messagesReceived();
}
- if (received == 0 && timeout < 0)
+ if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
result = null;
@@ -501,9 +507,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
.messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
- getSession().getQpidSession().sync();
- int received = getSession().getQpidSession().messagesReceived();
- if (received == 0 && _isNoWaitIsReceiving)
+ _messageReceived.set(false);
+
+ // When sync() returns we know whether we have received a message or not.
+ getSession().getQpidSession().sync();
+
+ if (_messageReceived.get() && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a message
// but no one can be delivered it then need to return
@@ -632,4 +641,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().testQpidException();
}
}
+
+ public void notifyMessageReceived()
+ {
+ _messageReceived.set(true);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
index ada5d048e1..4dbf86a388 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
@@ -62,6 +62,9 @@ public class QpidMessageListener implements MessageListener
{
try
{
+ // to be used with flush
+ _consumer.notifyMessageReceived();
+
//convert this message into a JMS one
QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
// if consumer is asynchronous then send this message to its session.