diff options
author | Phil Harvey <philharveyonline@apache.org> | 2012-12-20 09:48:35 +0000 |
---|---|---|
committer | Phil Harvey <philharveyonline@apache.org> | 2012-12-20 09:48:35 +0000 |
commit | 7e577de7a0bd77c87f7b2c1961ec11b0f3b35502 (patch) | |
tree | d0414712ffed8f3e5aec75b8ad5f2a40be2c3063 | |
parent | a25269e5ba5801b787eebf2fd12d466c9fdba70e (diff) | |
download | qpid-python-7e577de7a0bd77c87f7b2c1961ec11b0f3b35502.tar.gz |
QPID-4515: improved broker logging, particularly when receiving/sending AMQP 0-8/0-9 frames and when committing transactions.
Work done by Keith (kwall) and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424382 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 151 insertions, 54 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 851038c6de..eb8723461e 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -902,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { LOGGER.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " [Transaction" + tx + "]"); + + " in transaction " + tx); } _deliveryDb.put(tx, key, value); } @@ -1056,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]"); + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); } } catch (DatabaseException e) @@ -1078,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("abortTran called for [Transaction:" + tx + "]"); + LOGGER.debug("abortTran called for transaction " + tx); } try @@ -1190,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); } } @@ -1215,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = " - + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called"); + LOGGER.debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); } DatabaseEntry key = new DatabaseEntry(); @@ -1230,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageMetaDataDb.put(tx, key, value); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); } } catch (DatabaseException e) diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index fe1556b5a6..598d20146c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -80,7 +80,7 @@ public class CommitThreadWrapper { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + LOGGER.debug("complete() called for transaction " + _tx); } _complete = true; @@ -101,7 +101,10 @@ public class CommitThreadWrapper if(!_syncCommit) { - LOGGER.debug("CommitAsync was requested, returning immediately."); + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + } return; } @@ -121,6 +124,12 @@ public class CommitThreadWrapper public synchronized void waitForCompletion() { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + while (!isComplete()) { _commitThread.explicitNotify(); @@ -133,6 +142,12 @@ public class CommitThreadWrapper throw new RuntimeException(e); } } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } } } @@ -198,8 +213,20 @@ public class CommitThreadWrapper try { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + _environment.flushLog(true); + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("flushLog completed in " + duration + " ms"); + } + for(int i = 0; i < size; i++) { BDBCommitFuture commit = _jobQueue.poll(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 76a3a7f224..0826f182fd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -378,7 +378,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (_logger.isDebugEnabled()) { - _logger.debug(debugIdentity() + "Content body received on channel " + _channelId); + _logger.debug(debugIdentity() + " content body received on channel " + _channelId); } try @@ -1583,6 +1583,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void sync() { + if(_logger.isDebugEnabled()) + { + _logger.debug("sync() called on channel " + debugIdentity()); + } + AsyncCommand cmd; while((cmd = _unfinishedCommandsQueue.poll()) != null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java index a68ac5439c..917215a42f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java @@ -218,55 +218,71 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter final boolean isRedelivered = entry.isRedelivered(); - final AMQBody returnBlock = new AMQBody() - { - - private AMQBody _underlyingBody; - - public AMQBody createAMQBody() - { - return _methodRegistry.createBasicDeliverBody(consumerTag, - deliveryTag, - isRedelivered, - exchangeName, - routingKey); - - + final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered); + return returnBlock; + } + private class EncodedDeliveryBody implements AMQBody + { + private final long _deliveryTag; + private final AMQShortString _routingKey; + private final AMQShortString _exchangeName; + private final AMQShortString _consumerTag; + private final boolean _isRedelivered; + private AMQBody _underlyingBody; + + private EncodedDeliveryBody(long deliveryTag, AMQShortString routingKey, AMQShortString exchangeName, AMQShortString consumerTag, boolean isRedelivered) + { + _deliveryTag = deliveryTag; + _routingKey = routingKey; + _exchangeName = exchangeName; + _consumerTag = consumerTag; + _isRedelivered = isRedelivered; + } + public AMQBody createAMQBody() + { + return _methodRegistry.createBasicDeliverBody(_consumerTag, + _deliveryTag, + _isRedelivered, + _exchangeName, + _routingKey); + } - } + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } - public byte getFrameType() + public int getSize() + { + if(_underlyingBody == null) { - return AMQMethodBody.TYPE; + _underlyingBody = createAMQBody(); } + return _underlyingBody.getSize(); + } - public int getSize() + public void writePayload(DataOutput buffer) throws IOException + { + if(_underlyingBody == null) { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - return _underlyingBody.getSize(); + _underlyingBody = createAMQBody(); } + _underlyingBody.writePayload(buffer); + } - public void writePayload(DataOutput buffer) throws IOException - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - _underlyingBody.writePayload(buffer); - } + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) - throws AMQException - { - throw new AMQException("This block should never be dispatched!"); - } - }; - return returnBlock; + @Override + public String toString() + { + return "[" + getClass().getSimpleName() + " underlyingBody: " + String.valueOf(_underlyingBody) + "]"; + } } private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) @@ -368,7 +384,6 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter _methodBody = methodBody; _headerBody = headerBody; _contentBody = contentBody; - } public long getSize() @@ -380,6 +395,19 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); } + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder(); + builder.append("[").append(getClass().getSimpleName()) + .append(" methodBody=").append(_methodBody) + .append(", headerBody=").append(_headerBody) + .append(", contentBody=").append(_contentBody) + .append(", channel=").append(_channel).append("]"); + return builder.toString(); + } + } public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock @@ -408,6 +436,17 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter { AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); } + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder(); + builder.append(getClass().getSimpleName()) + .append("methodBody=").append(_methodBody) + .append(", headerBody=").append(_headerBody) + .append(", channel=").append(_channel).append("]"); + return builder.toString(); + } } }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 72c21d357e..f77f3a764a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -303,9 +303,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { + long startTime = 0; + String frameToString = null; if (_logger.isDebugEnabled()) { - _logger.debug("Frame Received: " + frame); + startTime = System.currentTimeMillis(); + frameToString = frame.toString(); + _logger.debug("RECV: " + frame); } // Check that this channel is not closing @@ -340,6 +344,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeChannel(channelId); throw e; } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); + } } finally { @@ -543,6 +552,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); + + if(_logger.isDebugEnabled()) + { + _logger.debug("SEND: " + frame); + } + _sender.send(buf); final long time = System.currentTimeMillis(); _lastIoTime = time; diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index f11fb1086e..df95ce46d5 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -384,10 +384,20 @@ public class LocalTransaction implements ServerTransaction private void doPostTransactionActions() { + if(_logger.isDebugEnabled()) + { + _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions"); + } + for(int i = 0; i < _postTransactionActions.size(); i++) { _postTransactionActions.get(i).postCommit(); } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Completed post transaction actions"); + } } public void rollback() diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 95c3e4669f..d66415c659 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -25,7 +25,6 @@ import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; - import static org.apache.qpid.transport.Option.COMPLETED; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.TIMELY_REPLY; @@ -414,7 +413,7 @@ public class Session extends SessionInvoker if(log.isDebugEnabled()) { - log.debug("ID: [%s] %s", this.channel, id); + log.debug("identify: ch=%s, commandId=%s", this.channel, id); } if ((id & 0xff) == 0) @@ -443,7 +442,7 @@ public class Session extends SessionInvoker { if(log.isDebugEnabled()) { - log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, syncPoint, maxProcessed); + log.debug("%s ch=%s processed([%d,%d]) %s %s", this, channel, lower, upper, syncPoint, maxProcessed); } boolean flush; @@ -451,7 +450,7 @@ public class Session extends SessionInvoker { if(log.isDebugEnabled()) { - log.debug("%s", processed); + log.debug("%s processed: %s", this, processed); } if (ge(upper, commandsIn)) |