summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java66
3 files changed, 55 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index ec74f79ace..6d2826b55e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -407,11 +407,13 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
AMQConstant.CHANNEL_ERROR.getName().toString());
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, ce, false);
}
}
catch (AMQConnectionException e)
{
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, e, false);
}
}
@@ -744,7 +746,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
if (_logger.isInfoEnabled())
{
- _logger.info("Closing connection due to: " + e.getMessage());
+ _logger.info("Closing connection due to: " + e);
}
markChannelAwaitingCloseOk(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index c5b3099f58..6850724b10 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -196,7 +196,7 @@ public class AMQStateManager implements AMQMethodListener
|| (body instanceof ChannelCloseOkBody)
|| (body instanceof ChannelCloseBody)))
{
- throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index be7af6b21f..49f94edef7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -170,6 +170,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ final AMQSession _thisSession = this;
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -438,22 +440,45 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _prefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- _suspendState.set(true);
- new Thread(new SuspenderRunner(_suspendState)).start();
-
+ // If the session has been closed don't waste time creating a thread to do
+ // flow control
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ // Only executute change if previous state
+ // was False
+ if (!_suspendState.getAndSet(true))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(
+ "Above threshold(" + _prefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ }
+ new Thread(new SuspenderRunner(_suspendState)).start();
+ }
+ }
}
public void underThreshold(int currentValue)
{
- _logger.debug(
- "Below threshold(" + _prefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
- _suspendState.set(false);
- new Thread(new SuspenderRunner(_suspendState)).start();
-
+ // If the session has been closed don't waste time creating a thread to do
+ // flow control
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ // Only executute change if previous state
+ // was true
+ if (_suspendState.getAndSet(false))
+ {
+ if (_logger.isDebugEnabled())
+ {
+
+ _logger.debug(
+ "Below threshold(" + _prefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ }
+ new Thread(new SuspenderRunner(_suspendState)).start();
+ }
+ }
}
});
}
@@ -2947,8 +2972,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
}
+
}
// only call while holding lock
@@ -3110,12 +3136,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
synchronized (_suspensionLock)
{
- suspendChannel(_suspend.get());
+ // If the session has closed by the time we get here
+ // then we should not attempt to write to the sesion/channel.
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ suspendChannel(_suspend.get());
+ }
}
}
catch (AMQException e)
{
- _logger.warn("Unable to suspend channel");
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Is the _queue empty?" + _queue.isEmpty());
+ _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+ }
}
}
}