summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-02-05 10:13:21 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-02-05 10:13:21 +0000
commit9a2820488699edbf64cd48d5ce69700a0963d925 (patch)
treed6f1c85102c5cbfe04e63229d2ef03cec714d881
parent5428564d2440189b9d978c3f8467872cc0802252 (diff)
downloadqpid-python-9a2820488699edbf64cd48d5ce69700a0963d925.tar.gz
QPID-2370 : Committing patch to improve broker logging. This will not cleanly apply to trunk due to IO changes.
QPID-1084 : Committed change to prevent flow control threads being created/sent if the channel/session is closed or the state has acutally changed in the mean time. Wrapped .debug statements as per review feedback Merged and adapted these changes from 0.5.x r905592,905596,905605 The AMQMinaProtocolSession Changes were moved to the AMQProtocolEngine git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906890 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java66
3 files changed, 55 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index ec74f79ace..6d2826b55e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -407,11 +407,13 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
AMQConstant.CHANNEL_ERROR.getName().toString());
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, ce, false);
}
}
catch (AMQConnectionException e)
{
+ _logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, e, false);
}
}
@@ -744,7 +746,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
if (_logger.isInfoEnabled())
{
- _logger.info("Closing connection due to: " + e.getMessage());
+ _logger.info("Closing connection due to: " + e);
}
markChannelAwaitingCloseOk(channelId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index c5b3099f58..6850724b10 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -196,7 +196,7 @@ public class AMQStateManager implements AMQMethodListener
|| (body instanceof ChannelCloseOkBody)
|| (body instanceof ChannelCloseBody)))
{
- throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index be7af6b21f..49f94edef7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -170,6 +170,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ final AMQSession _thisSession = this;
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -438,22 +440,45 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _prefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- _suspendState.set(true);
- new Thread(new SuspenderRunner(_suspendState)).start();
-
+ // If the session has been closed don't waste time creating a thread to do
+ // flow control
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ // Only executute change if previous state
+ // was False
+ if (!_suspendState.getAndSet(true))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(
+ "Above threshold(" + _prefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ }
+ new Thread(new SuspenderRunner(_suspendState)).start();
+ }
+ }
}
public void underThreshold(int currentValue)
{
- _logger.debug(
- "Below threshold(" + _prefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
- _suspendState.set(false);
- new Thread(new SuspenderRunner(_suspendState)).start();
-
+ // If the session has been closed don't waste time creating a thread to do
+ // flow control
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ // Only executute change if previous state
+ // was true
+ if (_suspendState.getAndSet(false))
+ {
+ if (_logger.isDebugEnabled())
+ {
+
+ _logger.debug(
+ "Below threshold(" + _prefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ }
+ new Thread(new SuspenderRunner(_suspendState)).start();
+ }
+ }
}
});
}
@@ -2947,8 +2972,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
}
+
}
// only call while holding lock
@@ -3110,12 +3136,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
synchronized (_suspensionLock)
{
- suspendChannel(_suspend.get());
+ // If the session has closed by the time we get here
+ // then we should not attempt to write to the sesion/channel.
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ suspendChannel(_suspend.get());
+ }
}
}
catch (AMQException e)
{
- _logger.warn("Unable to suspend channel");
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Is the _queue empty?" + _queue.isEmpty());
+ _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+ }
}
}
}