summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-13 10:50:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-13 10:50:37 +0000
commit81b0b0fb508770fc88c8a2283b5d497c6efe90dc (patch)
tree1a7885c82a31eb85d5e8b5a944265ea0a281abe1
parent87a4900be441b172e92c4c7117ea4cabf0442195 (diff)
downloadqpid-python-81b0b0fb508770fc88c8a2283b5d497c6efe90dc.tar.gz
Add logging
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631344 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java164
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java64
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java9
4 files changed, 245 insertions, 5 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4087b1f4a0..5ca94891c1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.apache.qpid.transport.util.Functions.hex;
+
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
@@ -1855,6 +1857,15 @@ public class AMQChannel
final boolean passive,
final boolean active, final boolean write, final boolean read)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: " + realm +
+ " exclusive: " + exclusive +
+ " passive: " + passive +
+ " active: " + active +
+ " write: " + write + " read: " + read + " ]");
+ }
+
MethodRegistry methodRegistry = _connection.getMethodRegistry();
if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
@@ -1876,12 +1887,23 @@ public class AMQChannel
@Override
public void receiveBasicAck(final long deliveryTag, final boolean multiple)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
+ }
+
acknowledgeMessage(deliveryTag, multiple);
}
@Override
public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait)
{
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" consumerTag: " + consumerTag + " noWait: " + nowait + " ]");
+ }
+
unsubscribeConsumer(consumerTag);
if (!nowait)
{
@@ -1899,6 +1921,16 @@ public class AMQChannel
final boolean noAck,
final boolean exclusive, final boolean nowait, final FieldTable arguments)
{
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue +
+ " consumerTag: " + consumerTag +
+ " noLocal: " + noLocal +
+ " noAck: " + noAck +
+ " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
+ }
+
AMQShortString consumerTag1 = consumerTag;
VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
sync();
@@ -2020,6 +2052,11 @@ public class AMQChannel
@Override
public void receiveBasicGet(final AMQShortString queueName, final boolean noAck)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]");
+ }
+
VirtualHostImpl vHost = _connection.getVirtualHost();
sync();
AMQQueue queue = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName.toString());
@@ -2080,6 +2117,14 @@ public class AMQChannel
final boolean mandatory,
final boolean immediate)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" exchange: " + exchangeName +
+ " routingKey: " + routingKey +
+ " mandatory: " + mandatory +
+ " immediate: " + immediate + " ]");
+ }
+
VirtualHostImpl vHost = _connection.getVirtualHost();
MessageDestination destination;
@@ -2121,6 +2166,11 @@ public class AMQChannel
@Override
public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicQos[" +" prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]");
+ }
+
sync();
setCredit(prefetchSize, prefetchCount);
@@ -2133,6 +2183,11 @@ public class AMQChannel
@Override
public void receiveBasicRecover(final boolean requeue, final boolean sync)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]");
+ }
+
resend();
if (sync)
@@ -2149,6 +2204,11 @@ public class AMQChannel
@Override
public void receiveBasicReject(final long deliveryTag, final boolean requeue)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicReject[" +" deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]");
+ }
+
MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag);
if (message == null)
@@ -2228,6 +2288,12 @@ public class AMQChannel
final int classId,
final int methodId)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
+ }
+
+
sync();
_connection.closeChannel(this);
@@ -2238,12 +2304,21 @@ public class AMQChannel
@Override
public void receiveChannelCloseOk()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelCloseOk");
+ }
+
_connection.closeChannelOk(getChannelId());
}
@Override
public void receiveMessageContent(final byte[] data)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: " + hex(data,_connection.getBinaryDataLimit()) + " ] ");
+ }
if(hasCurrentMessage())
{
@@ -2260,6 +2335,11 @@ public class AMQChannel
@Override
public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]");
+ }
+
if(hasCurrentMessage())
{
publishContentHeader(new ContentHeaderBody(properties, bodySize));
@@ -2281,6 +2361,12 @@ public class AMQChannel
@Override
public void receiveChannelFlow(final boolean active)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " + active + " ]");
+ }
+
+
sync();
setSuspended(!active);
@@ -2293,6 +2379,11 @@ public class AMQChannel
@Override
public void receiveChannelFlowOk(final boolean active)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: " + active + " ]");
+ }
+
// TODO - should we do anything here?
}
@@ -2301,6 +2392,12 @@ public class AMQChannel
final AMQShortString routingKey,
final AMQShortString queueName)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" exchange: " + exchangeName + " routingKey: " +
+ routingKey + " queue: " + queueName + " ]");
+ }
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
@@ -2476,6 +2573,16 @@ public class AMQChannel
final boolean nowait,
final FieldTable arguments)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName +
+ " type: " + type +
+ " passive: " + passive +
+ " durable: " + durable +
+ " autoDelete: " + autoDelete +
+ " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]");
+ }
+
ExchangeImpl exchange;
VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
if (isDefaultExchange(exchangeName))
@@ -2620,6 +2727,12 @@ public class AMQChannel
@Override
public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]");
+ }
+
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
sync();
try
@@ -2673,6 +2786,14 @@ public class AMQChannel
final boolean nowait,
final FieldTable argumentsTable)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + queueName +
+ " exchange: " + exchange +
+ " bindingKey: " + routingKey +
+ " nowait: " + nowait + " arguments: " + argumentsTable + " ]");
+ }
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
AMQQueue<?> queue;
if (queueName == null)
@@ -2777,6 +2898,15 @@ public class AMQChannel
final boolean nowait,
final FieldTable arguments)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr +
+ " passive: " + passive +
+ " durable: " + durable +
+ " exclusive: " + exclusive +
+ " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
+ }
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
final AMQShortString queueName;
@@ -2966,6 +3096,11 @@ public class AMQChannel
final boolean ifEmpty,
final boolean nowait)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]");
+ }
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
sync();
AMQQueue queue;
@@ -3028,6 +3163,11 @@ public class AMQChannel
@Override
public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]");
+ }
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
AMQQueue queue = null;
if (queueName == null && (queue = getDefaultQueue()) == null)
@@ -3073,6 +3213,14 @@ public class AMQChannel
final AMQShortString routingKey,
final FieldTable arguments)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " + queueName +
+ " exchange: " + exchange +
+ " bindingKey: " + routingKey +
+ " arguments: " + arguments + " ]");
+ }
+
VirtualHostImpl virtualHost = _connection.getVirtualHost();
@@ -3132,6 +3280,11 @@ public class AMQChannel
@Override
public void receiveTxSelect()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] TxSelect");
+ }
+
setLocalTransactional();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
@@ -3143,6 +3296,12 @@ public class AMQChannel
@Override
public void receiveTxCommit()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] TxCommit");
+ }
+
+
if (!isTransactional())
{
closeChannel(AMQConstant.COMMAND_INVALID,
@@ -3165,6 +3324,11 @@ public class AMQChannel
@Override
public void receiveTxRollback()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] TxRollback");
+ }
+
if (!isTransactional())
{
closeChannel(AMQConstant.COMMAND_INVALID,
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index abb68977ae..413cf49eaf 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -99,6 +99,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
+ public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
private final Port<?> _port;
private final long _creationTime;
@@ -180,6 +182,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private int _messageCompressionThreshold;
private int _currentClassId;
private int _currentMethodId;
+ private int _binaryDataLimit;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -195,7 +198,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
-
+ _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
+ ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
+ : DEFAULT_DEBUG_BINARY_DATA_LENGTH;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
runAsSubject(new PrivilegedAction<Void>()
{
@@ -1365,6 +1370,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public void receiveChannelOpen(final int channelId)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + channelId + "] ChannelOpen");
+ }
+
// Protect the broker against out of order frame request.
if (_virtualHost == null)
{
@@ -1405,6 +1415,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQShortString capabilities,
boolean insist)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionOpen[" +" virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]");
+ }
+
String virtualHostStr;
if ((virtualHostName != null) && virtualHostName.charAt(0) == '/')
{
@@ -1462,6 +1477,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
final int classId,
final int methodId)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
+ }
+
if (_logger.isInfoEnabled())
{
_logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" +
@@ -1487,6 +1507,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public void receiveConnectionCloseOk()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionCloseOk");
+ }
_logger.info("Received Connection-close-ok");
@@ -1503,6 +1527,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public void receiveConnectionSecureOk(final byte[] response)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
+ }
Broker<?> broker = getBroker();
@@ -1579,6 +1607,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
final byte[] response,
final AMQShortString locale)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionStartOk["
+ + " clientProperties: "
+ + clientProperties
+ + " mechanism: "
+ + mechanism
+ + " response: ********"
+ + " locale: "
+ + locale
+ + " ]");
+ }
+
Broker<?> broker = getBroker();
_logger.info("SASL Mechanism selected: " + mechanism);
@@ -1658,6 +1699,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
+ }
+
initHeartbeats(heartbeat);
int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
@@ -1692,6 +1738,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
+ public int getBinaryDataLimit()
+ {
+ return _binaryDataLimit;
+ }
+
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
@@ -1810,12 +1861,23 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public void receiveHeartbeat()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV Heartbeat");
+ }
+
// No op
}
@Override
public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
{
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV ProtocolHeader [" + protocolInitiation + " ]");
+ }
+
protocolInitiationReceived(protocolInitiation);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
index 5a4466b003..6497a04946 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
@@ -26,6 +26,8 @@ import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.codec.ServerDecoder;
import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -33,6 +35,7 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class BrokerDecoder extends ServerDecoder
{
+ private static final Logger _logger = Logger.getLogger(BrokerDecoder.class);
private final AMQProtocolEngine _connection;
/**
* Creates a new AMQP decoder.
@@ -49,6 +52,11 @@ public class BrokerDecoder extends ServerDecoder
protected void processFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in)
throws AMQFrameDecodingException, IOException
{
+ long startTime = 0;
+ if (_logger.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ }
Subject subject;
AMQChannel channel = _connection.getChannel(channelId);
if(channel == null)
@@ -72,6 +80,11 @@ public class BrokerDecoder extends ServerDecoder
return null;
}
});
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms.");
+ }
+
}
catch (PrivilegedActionException e)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 0bb72aa88f..ed1935ca04 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.MarkableDataInput;
-
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
+
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -227,7 +228,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
public String toString()
{
- StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
+ StringBuffer buffer = new StringBuffer(new String(_protocolHeader, StandardCharsets.US_ASCII));
buffer.append(Integer.toHexString(_protocolClass));
buffer.append(Integer.toHexString(_protocolInstance));
buffer.append(Integer.toHexString(_protocolMajor));