diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java | 126 |
1 files changed, 102 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 86db9d5859..1badbb601c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; @@ -74,8 +78,21 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener /** This flag is used to indicate that the blocked for method has been received. */ private volatile boolean _ready = false; + /** This flag is used to indicate that the received error has been processed. */ + private volatile boolean _errorAck = false; + /** Used to protect the shared event and ready flag between the producer and consumer. */ - private final Object _lock = new Object(); + private final ReentrantLock _lock = new ReentrantLock(); + + /** + * Used to signal that a method has been received + */ + private final Condition _receivedCondition = _lock.newCondition(); + + /** + * Used to signal that a error has been processed + */ + private final Condition _errorConditionAck = _lock.newCondition(); /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; @@ -126,11 +143,16 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener // we only update the flag from inside the synchronized block // so that the blockForFrame method cannot "miss" an update - it // will only ever read the flag from within the synchronized block - synchronized (_lock) + _lock.lock(); + try { _doneEvt = evt; _ready = ready; - _lock.notify(); + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); } } @@ -159,7 +181,11 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener */ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { - synchronized (_lock) + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); + + _lock.lock(); + + try { while (!_ready) { @@ -167,13 +193,13 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener { if (timeout == -1) { - _lock.wait(); + _receivedCondition.await(); } else { + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); - _lock.wait(timeout); - if (!_ready) + if (nanoTimeout <= 0 && !_ready && _error == null) { _error = new AMQTimeoutException("Server did not respond in a timely fashion"); _ready = true; @@ -190,23 +216,32 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener // } } } - } - if (_error != null) - { - if (_error instanceof AMQException) - { - throw (AMQException) _error; - } - else if (_error instanceof FailoverException) - { - // This should ensure that FailoverException is not wrapped and can be caught. - throw (FailoverException) _error; // needed to expose FailoverException. - } - else + + if (_error != null) { - throw new AMQException("Woken up due to " + _error.getClass(), _error); + if (_error instanceof AMQException) + { + throw (AMQException) _error; + } + else if (_error instanceof FailoverException) + { + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. + } + else + { + throw new AMQException("Woken up due to " + _error.getClass(), _error); + } } + + } + finally + { + _errorAck = true; + _errorConditionAck.signal(); + _error = null; + _lock.unlock(); } return _doneEvt; @@ -222,12 +257,55 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener { // set the error so that the thread that is blocking (against blockForFrame()) // can pick up the exception and rethrow to the caller - _error = e; - synchronized (_lock) + + _lock.lock(); + + if (_error == null) + { + _error = e; + } + else + { + System.err.println("WARNING: new error arrived while old one not yet processed"); + } + + try { _ready = true; - _lock.notify(); + _receivedCondition.signal(); + + while (!_errorAck) + { + try + { + _errorConditionAck.await(); + } + catch (InterruptedException e1) + { + // + } + } + _errorAck = false; + } + finally + { + _lock.unlock(); } } + + public boolean equals(Object o) + { + + if (o instanceof BlockingMethodFrameListener) + { + BlockingMethodFrameListener other = (BlockingMethodFrameListener) o; + + return _channelId == other._channelId; + } + + return false; + } + + } |