diff options
author | Robert Gemmell <robbie@apache.org> | 2010-03-04 11:18:30 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-03-04 11:18:30 +0000 |
commit | 6f7e476ee694881cecccbe73ae7011c1cdba7ee0 (patch) | |
tree | 25321423925d7a71596867ce954e9866117d284b | |
parent | 7c9e337a5a364d5946c1fdfc3bb65677b61ec2df (diff) | |
download | qpid-python-6f7e476ee694881cecccbe73ae7011c1cdba7ee0.tar.gz |
QPID-2379: add BytesTxnEnqueues and MsgTxnEnqueues on Queue delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918941 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 92 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java index c250b2c011..de9bf1e9cb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.message.*; import org.apache.qpid.transport.codec.BBEncoder; @@ -202,4 +203,9 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead } } + public SessionConfig getSessionConfig() + { + return null; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index b6c06f7f34..3f1f354585 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -912,8 +912,7 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getMsgTxnEnqueues() { - // TODO - return 0l; + return _obj.getMsgTxnEnqueues(); } public Long getMsgTxnDequeues() @@ -954,8 +953,7 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getByteTxnEnqueues() { - // TODO - return 0l; + return _obj.getByteTxnEnqueues(); } public Long getByteTxnDequeues() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 15ac52305f..ec1b22270f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -312,13 +312,13 @@ public class AMQChannel implements SessionConfig } else { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage)); + _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional())); } } else { - _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); incrementOutstandingTxnsIfNecessary(); } } @@ -1030,7 +1030,7 @@ public class AMQChannel implements SessionConfig } - private AMQMessage createAMQMessage(IncomingMessage incomingMessage) + private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional) throws AMQException { @@ -1054,12 +1054,15 @@ public class AMQChannel implements SessionConfig private class MessageDeliveryAction implements ServerTransaction.Action { + private boolean _transactional; private IncomingMessage _incommingMessage; private ArrayList<? extends BaseQueue> _destinationQueues; public MessageDeliveryAction(IncomingMessage currentMessage, - ArrayList<? extends BaseQueue> destinationQueues) + ArrayList<? extends BaseQueue> destinationQueues, + boolean transactional) { + _transactional = transactional; _incommingMessage = currentMessage; _destinationQueues = destinationQueues; } @@ -1070,7 +1073,7 @@ public class AMQChannel implements SessionConfig { final boolean immediate = _incommingMessage.isImmediate(); - final AMQMessage amqMessage = createAMQMessage(_incommingMessage); + final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional); MessageReference ref = amqMessage.newReference(); for(final BaseQueue queue : _destinationQueues) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java index 8a5559c155..0b5df9b8fa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java @@ -49,6 +49,8 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf int getConsumerCountHigh(); int getBindingCount(); + + int getBindingCountHigh(); ConfigStore getConfigStore(); @@ -57,8 +59,10 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf long getTotalEnqueueSize(); long getTotalDequeueSize(); + + long getByteTxnEnqueues(); - int getBindingCountHigh(); + long getMsgTxnEnqueues(); long getPersistentByteEnqueues(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java index e46e951588..5e5dd10e57 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java @@ -46,4 +46,6 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi Long getTxnRejects(); Long getTxnCount(); + + boolean isTransactional(); }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index b8a36aba58..e0c181a5fc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -24,11 +24,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.queue.AMQQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.lang.ref.WeakReference; import java.nio.ByteBuffer; /** @@ -63,9 +66,16 @@ public class AMQMessage implements ServerMessage private final StoredMessage<MessageMetaData> _handle; + WeakReference<AMQChannel> _channelRef; + public AMQMessage(StoredMessage<MessageMetaData> handle) { + this(handle, null); + } + + public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef) + { _handle = handle; final MessageMetaData metaData = handle.getMetaData(); _size = metaData.getContentSize(); @@ -75,6 +85,8 @@ public class AMQMessage implements ServerMessage { _flags |= IMMEDIATE; } + + _channelRef = channelRef; } @@ -326,4 +338,9 @@ public class AMQMessage implements ServerMessage { return _handle; } + + public SessionConfig getSessionConfig() + { + return _channelRef == null ? null : ((SessionConfig) _channelRef.get()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index baa5dce14f..08006435f8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -21,9 +21,10 @@ package org.apache.qpid.server.message; import org.apache.qpid.transport.*; +import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.transport.ServerSession; -import java.util.concurrent.atomic.AtomicLong; import java.nio.ByteBuffer; import java.lang.ref.WeakReference; @@ -37,13 +38,11 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage private WeakReference<Session> _sessionRef; - public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef) { _storeMessage = storeMessage; _sessionRef = sessionRef; - } private MessageMetaData_0_10 getMetaData() @@ -142,5 +141,9 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage return _sessionRef == null ? null : _sessionRef.get(); } + public SessionConfig getSessionConfig() + { + return _sessionRef == null ? null : (ServerSession) _sessionRef.get(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 1ac538c15b..2f2d39115f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.message; import java.nio.ByteBuffer; +import org.apache.qpid.server.configuration.SessionConfig; + public interface ServerMessage extends EnqueableMessage, MessageContentSource { String getRoutingKey(); @@ -44,4 +46,5 @@ public interface ServerMessage extends EnqueableMessage, MessageContentSource public int getContent(ByteBuffer buf, int offset); + SessionConfig getSessionConfig(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index c64b9047de..d99b551936 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -12,6 +12,7 @@ import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.QueueConfigType; import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -122,6 +123,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0); + private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); + private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); private final AtomicInteger _bindingCountHigh = new AtomicInteger(); @@ -516,7 +519,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { - + incrementTxnEnqueueStats(message); incrementQueueCount(); incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); @@ -666,6 +669,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { getAtomicQueueCount().incrementAndGet(); } + + private void incrementTxnEnqueueStats(final ServerMessage message) + { + SessionConfig session = message.getSessionConfig(); + + if(session !=null && session.isTransactional()) + { + _msgTxnEnqueues.incrementAndGet(); + _byteTxnEnqueues.addAndGet(message.getSize()); + } + } private void deliverMessage(final Subscription sub, final QueueEntry entry) throws AMQException @@ -2064,6 +2078,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _dequeueSize.get(); } + + public long getByteTxnEnqueues() + { + return _byteTxnEnqueues.get(); + } + + public long getMsgTxnEnqueues() + { + return _msgTxnEnqueues.get(); + } public long getPersistentByteEnqueues() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index a65f3938a6..99c3572a2f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -388,6 +388,14 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo sub.releaseSendLock(); } } + + public boolean isTransactional() + { + // this does not look great but there should only be one "non-transactional" + // transactional context, while there could be several transactional ones in + // theory + return !(_transaction instanceof AutoCommitTransaction); + } public void selectTx() { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 1a2bcd87c5..b0fbab4146 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -552,4 +552,14 @@ public class MockAMQQueue implements AMQQueue { return 0; } + + public long getByteTxnEnqueues() + { + return 0; + } + + public long getMsgTxnEnqueues() + { + return 0; + } } |