summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java')
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java35
1 files changed, 33 insertions, 2 deletions
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index ad390fd498..8b792db1f1 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler
}
if(hasMore)
{
- xfr = receiveFromPrefetch(0L);
+ xfr = receiveFromPrefetch(-1l);
if(xfr== null)
{
// TODO - this is wrong!!!!
@@ -503,6 +503,37 @@ public class Receiver implements DeliveryStateHandler
_endpoint.drain();
}
+ /**
+ * Waits for the receiver to drain or a message to be available to be received.
+ * @return true if the receiver has been drained.
+ */
+ public boolean drainWait()
+ {
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ try
+ {
+ while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return _prefetchQueue.peek()==null && _endpoint.isDrained();
+ }
+
+ /**
+ * Clears the receiver drain so that message delivery can resume.
+ */
+ public void clearDrain()
+ {
+ _endpoint.clearDrain();
+ }
+
public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
{
_endpoint.setLinkCredit(credit);
@@ -558,4 +589,4 @@ public class Receiver implements DeliveryStateHandler
void messageArrived(Receiver receiver);
}
-} \ No newline at end of file
+}