summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Harvey <philharveyonline@apache.org>2012-12-20 09:48:35 +0000
committerPhil Harvey <philharveyonline@apache.org>2012-12-20 09:48:35 +0000
commit7e577de7a0bd77c87f7b2c1961ec11b0f3b35502 (patch)
treed0414712ffed8f3e5aec75b8ad5f2a40be2c3063
parenta25269e5ba5801b787eebf2fd12d466c9fdba70e (diff)
downloadqpid-python-7e577de7a0bd77c87f7b2c1961ec11b0f3b35502.tar.gz
QPID-4515: improved broker logging, particularly when receiving/sending AMQP 0-8/0-9 frames and when committing transactions.
Work done by Keith (kwall) and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424382 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java16
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java117
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java17
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java7
7 files changed, 151 insertions, 54 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 851038c6de..eb8723461e 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -902,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
- + " [Transaction" + tx + "]");
+ + " in transaction " + tx);
}
_deliveryDb.put(tx, key, value);
}
@@ -1056,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]");
+ String transactionType = syncCommit ? "synchronous" : "asynchronous";
+ LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx);
}
}
catch (DatabaseException e)
@@ -1078,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("abortTran called for [Transaction:" + tx + "]");
+ LOGGER.debug("abortTran called for transaction " + tx);
}
try
@@ -1190,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx);
}
}
@@ -1215,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+ LOGGER.debug("storeMetaData called for transaction " + tx
+ + ", messageId " + messageId
+ + ", messageMetaData " + messageMetaData);
}
DatabaseEntry key = new DatabaseEntry();
@@ -1230,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageMetaDataDb.put(tx, key, value);
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
+ LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx);
}
}
catch (DatabaseException e)
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index fe1556b5a6..598d20146c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -80,7 +80,7 @@ public class CommitThreadWrapper
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ LOGGER.debug("complete() called for transaction " + _tx);
}
_complete = true;
@@ -101,7 +101,10 @@ public class CommitThreadWrapper
if(!_syncCommit)
{
- LOGGER.debug("CommitAsync was requested, returning immediately.");
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ }
return;
}
@@ -121,6 +124,12 @@ public class CommitThreadWrapper
public synchronized void waitForCompletion()
{
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
while (!isComplete())
{
_commitThread.explicitNotify();
@@ -133,6 +142,12 @@ public class CommitThreadWrapper
throw new RuntimeException(e);
}
}
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
}
}
@@ -198,8 +213,20 @@ public class CommitThreadWrapper
try
{
+ long startTime = 0;
+ if(LOGGER.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
+
_environment.flushLog(true);
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("flushLog completed in " + duration + " ms");
+ }
+
for(int i = 0; i < size; i++)
{
BDBCommitFuture commit = _jobQueue.poll();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 76a3a7f224..0826f182fd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -378,7 +378,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (_logger.isDebugEnabled())
{
- _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
+ _logger.debug(debugIdentity() + " content body received on channel " + _channelId);
}
try
@@ -1583,6 +1583,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void sync()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("sync() called on channel " + debugIdentity());
+ }
+
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.poll()) != null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
index a68ac5439c..917215a42f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
@@ -218,55 +218,71 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final boolean isRedelivered = entry.isRedelivered();
- final AMQBody returnBlock = new AMQBody()
- {
-
- private AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return _methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
+ final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
+ return returnBlock;
+ }
+ private class EncodedDeliveryBody implements AMQBody
+ {
+ private final long _deliveryTag;
+ private final AMQShortString _routingKey;
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _consumerTag;
+ private final boolean _isRedelivered;
+ private AMQBody _underlyingBody;
+
+ private EncodedDeliveryBody(long deliveryTag, AMQShortString routingKey, AMQShortString exchangeName, AMQShortString consumerTag, boolean isRedelivered)
+ {
+ _deliveryTag = deliveryTag;
+ _routingKey = routingKey;
+ _exchangeName = exchangeName;
+ _consumerTag = consumerTag;
+ _isRedelivered = isRedelivered;
+ }
+ public AMQBody createAMQBody()
+ {
+ return _methodRegistry.createBasicDeliverBody(_consumerTag,
+ _deliveryTag,
+ _isRedelivered,
+ _exchangeName,
+ _routingKey);
+ }
- }
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
- public byte getFrameType()
+ public int getSize()
+ {
+ if(_underlyingBody == null)
{
- return AMQMethodBody.TYPE;
+ _underlyingBody = createAMQBody();
}
+ return _underlyingBody.getSize();
+ }
- public int getSize()
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
{
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
+ _underlyingBody = createAMQBody();
}
+ _underlyingBody.writePayload(buffer);
+ }
- public void writePayload(DataOutput buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
+ @Override
+ public String toString()
+ {
+ return "[" + getClass().getSimpleName() + " underlyingBody: " + String.valueOf(_underlyingBody) + "]";
+ }
}
private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
@@ -368,7 +384,6 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
_methodBody = methodBody;
_headerBody = headerBody;
_contentBody = contentBody;
-
}
public long getSize()
@@ -380,6 +395,19 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[").append(getClass().getSimpleName())
+ .append(" methodBody=").append(_methodBody)
+ .append(", headerBody=").append(_headerBody)
+ .append(", contentBody=").append(_contentBody)
+ .append(", channel=").append(_channel).append("]");
+ return builder.toString();
+ }
+
}
public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
@@ -408,6 +436,17 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append(getClass().getSimpleName())
+ .append("methodBody=").append(_methodBody)
+ .append(", headerBody=").append(_headerBody)
+ .append(", channel=").append(_channel).append("]");
+ return builder.toString();
+ }
}
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 72c21d357e..f77f3a764a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -303,9 +303,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
+ long startTime = 0;
+ String frameToString = null;
if (_logger.isDebugEnabled())
{
- _logger.debug("Frame Received: " + frame);
+ startTime = System.currentTimeMillis();
+ frameToString = frame.toString();
+ _logger.debug("RECV: " + frame);
}
// Check that this channel is not closing
@@ -340,6 +344,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeChannel(channelId);
throw e;
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
+ }
}
finally
{
@@ -543,6 +552,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("SEND: " + frame);
+ }
+
_sender.send(buf);
final long time = System.currentTimeMillis();
_lastIoTime = time;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index f11fb1086e..df95ce46d5 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -384,10 +384,20 @@ public class LocalTransaction implements ServerTransaction
private void doPostTransactionActions()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions");
+ }
+
for(int i = 0; i < _postTransactionActions.size(); i++)
{
_postTransactionActions.get(i).postCommit();
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Completed post transaction actions");
+ }
}
public void rollback()
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 95c3e4669f..d66415c659 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -25,7 +25,6 @@ import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
-
import static org.apache.qpid.transport.Option.COMPLETED;
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.TIMELY_REPLY;
@@ -414,7 +413,7 @@ public class Session extends SessionInvoker
if(log.isDebugEnabled())
{
- log.debug("ID: [%s] %s", this.channel, id);
+ log.debug("identify: ch=%s, commandId=%s", this.channel, id);
}
if ((id & 0xff) == 0)
@@ -443,7 +442,7 @@ public class Session extends SessionInvoker
{
if(log.isDebugEnabled())
{
- log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed);
+ log.debug("%s ch=%s processed([%d,%d]) %s %s", this, channel, lower, upper, syncPoint, maxProcessed);
}
boolean flush;
@@ -451,7 +450,7 @@ public class Session extends SessionInvoker
{
if(log.isDebugEnabled())
{
- log.debug("%s", processed);
+ log.debug("%s processed: %s", this, processed);
}
if (ge(upper, commandsIn))