diff options
3 files changed, 55 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index ec74f79ace..6d2826b55e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -407,11 +407,13 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, AMQConstant.CHANNEL_ERROR.getName().toString()); + _logger.info(e.getMessage() + " whilst processing:" + methodBody); closeConnection(channelId, ce, false); } } catch (AMQConnectionException e) { + _logger.info(e.getMessage() + " whilst processing:" + methodBody); closeConnection(channelId, e, false); } } @@ -744,7 +746,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { if (_logger.isInfoEnabled()) { - _logger.info("Closing connection due to: " + e.getMessage()); + _logger.info("Closing connection due to: " + e); } markChannelAwaitingCloseOk(channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index c5b3099f58..6850724b10 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -196,7 +196,7 @@ public class AMQStateManager implements AMQMethodListener || (body instanceof ChannelCloseOkBody) || (body instanceof ChannelCloseBody))) { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed"); + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index be7af6b21f..49f94edef7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -170,6 +170,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + final AMQSession _thisSession = this; + /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -438,22 +440,45 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void aboveThreshold(int currentValue) { - _logger.debug( - "Above threshold(" + _prefetchHighMark - + ") so suspending channel. Current value is " + currentValue); - _suspendState.set(true); - new Thread(new SuspenderRunner(_suspendState)).start(); - + // If the session has been closed don't waste time creating a thread to do + // flow control + if (!(_thisSession.isClosed() || _thisSession.isClosing())) + { + // Only executute change if previous state + // was False + if (!_suspendState.getAndSet(true)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug( + "Above threshold(" + _prefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + } + new Thread(new SuspenderRunner(_suspendState)).start(); + } + } } public void underThreshold(int currentValue) { - _logger.debug( - "Below threshold(" + _prefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); - _suspendState.set(false); - new Thread(new SuspenderRunner(_suspendState)).start(); - + // If the session has been closed don't waste time creating a thread to do + // flow control + if (!(_thisSession.isClosed() || _thisSession.isClosing())) + { + // Only executute change if previous state + // was true + if (_suspendState.getAndSet(false)) + { + if (_logger.isDebugEnabled()) + { + + _logger.debug( + "Below threshold(" + _prefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + } + new Thread(new SuspenderRunner(_suspendState)).start(); + } + } } }); } @@ -2947,8 +2972,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId); + _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession); } + } // only call while holding lock @@ -3110,12 +3136,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { synchronized (_suspensionLock) { - suspendChannel(_suspend.get()); + // If the session has closed by the time we get here + // then we should not attempt to write to the sesion/channel. + if (!(_thisSession.isClosed() || _thisSession.isClosing())) + { + suspendChannel(_suspend.get()); + } } } catch (AMQException e) { - _logger.warn("Unable to suspend channel"); + _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e); + if (_logger.isDebugEnabled()) + { + _logger.debug("Is the _queue empty?" + _queue.isEmpty()); + _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed)); + } } } } |