summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java126
1 files changed, 102 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 86db9d5859..1badbb601c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.protocol;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
@@ -74,8 +78,21 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
/** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
+ /** This flag is used to indicate that the received error has been processed. */
+ private volatile boolean _errorAck = false;
+
/** Used to protect the shared event and ready flag between the producer and consumer. */
- private final Object _lock = new Object();
+ private final ReentrantLock _lock = new ReentrantLock();
+
+ /**
+ * Used to signal that a method has been received
+ */
+ private final Condition _receivedCondition = _lock.newCondition();
+
+ /**
+ * Used to signal that a error has been processed
+ */
+ private final Condition _errorConditionAck = _lock.newCondition();
/** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
private volatile Exception _error;
@@ -126,11 +143,16 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
// we only update the flag from inside the synchronized block
// so that the blockForFrame method cannot "miss" an update - it
// will only ever read the flag from within the synchronized block
- synchronized (_lock)
+ _lock.lock();
+ try
{
_doneEvt = evt;
_ready = ready;
- _lock.notify();
+ _receivedCondition.signal();
+ }
+ finally
+ {
+ _lock.unlock();
}
}
@@ -159,7 +181,11 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
*/
public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
{
- synchronized (_lock)
+ long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
+
+ _lock.lock();
+
+ try
{
while (!_ready)
{
@@ -167,13 +193,13 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
if (timeout == -1)
{
- _lock.wait();
+ _receivedCondition.await();
}
else
{
+ nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);
- _lock.wait(timeout);
- if (!_ready)
+ if (nanoTimeout <= 0 && !_ready && _error == null)
{
_error = new AMQTimeoutException("Server did not respond in a timely fashion");
_ready = true;
@@ -190,23 +216,32 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
// }
}
}
- }
- if (_error != null)
- {
- if (_error instanceof AMQException)
- {
- throw (AMQException) _error;
- }
- else if (_error instanceof FailoverException)
- {
- // This should ensure that FailoverException is not wrapped and can be caught.
- throw (FailoverException) _error; // needed to expose FailoverException.
- }
- else
+
+ if (_error != null)
{
- throw new AMQException("Woken up due to " + _error.getClass(), _error);
+ if (_error instanceof AMQException)
+ {
+ throw (AMQException) _error;
+ }
+ else if (_error instanceof FailoverException)
+ {
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose FailoverException.
+ }
+ else
+ {
+ throw new AMQException("Woken up due to " + _error.getClass(), _error);
+ }
}
+
+ }
+ finally
+ {
+ _errorAck = true;
+ _errorConditionAck.signal();
+ _error = null;
+ _lock.unlock();
}
return _doneEvt;
@@ -222,12 +257,55 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
// set the error so that the thread that is blocking (against blockForFrame())
// can pick up the exception and rethrow to the caller
- _error = e;
- synchronized (_lock)
+
+ _lock.lock();
+
+ if (_error == null)
+ {
+ _error = e;
+ }
+ else
+ {
+ System.err.println("WARNING: new error arrived while old one not yet processed");
+ }
+
+ try
{
_ready = true;
- _lock.notify();
+ _receivedCondition.signal();
+
+ while (!_errorAck)
+ {
+ try
+ {
+ _errorConditionAck.await();
+ }
+ catch (InterruptedException e1)
+ {
+ //
+ }
+ }
+ _errorAck = false;
+ }
+ finally
+ {
+ _lock.unlock();
}
}
+
+ public boolean equals(Object o)
+ {
+
+ if (o instanceof BlockingMethodFrameListener)
+ {
+ BlockingMethodFrameListener other = (BlockingMethodFrameListener) o;
+
+ return _channelId == other._channelId;
+ }
+
+ return false;
+ }
+
+
}