summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-04 11:18:30 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-04 11:18:30 +0000
commit6f7e476ee694881cecccbe73ae7011c1cdba7ee0 (patch)
tree25321423925d7a71596867ce954e9866117d284b
parent7c9e337a5a364d5946c1fdfc3bb65677b61ec2df (diff)
downloadqpid-python-6f7e476ee694881cecccbe73ae7011c1cdba7ee0.tar.gz
QPID-2379: add BytesTxnEnqueues and MsgTxnEnqueues on Queue delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918941 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java10
11 files changed, 92 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
index c250b2c011..de9bf1e9cb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.*;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -202,4 +203,9 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead
}
}
+ public SessionConfig getSessionConfig()
+ {
+ return null;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index b6c06f7f34..3f1f354585 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -912,8 +912,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getMsgTxnEnqueues()
{
- // TODO
- return 0l;
+ return _obj.getMsgTxnEnqueues();
}
public Long getMsgTxnDequeues()
@@ -954,8 +953,7 @@ public class QMFService implements ConfigStore.ConfigEventListener
public Long getByteTxnEnqueues()
{
- // TODO
- return 0l;
+ return _obj.getByteTxnEnqueues();
}
public Long getByteTxnDequeues()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 15ac52305f..ec1b22270f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -312,13 +312,13 @@ public class AMQChannel implements SessionConfig
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage,isTransactional()));
}
}
else
{
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
incrementOutstandingTxnsIfNecessary();
}
}
@@ -1030,7 +1030,7 @@ public class AMQChannel implements SessionConfig
}
- private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+ private AMQMessage createAMQMessage(IncomingMessage incomingMessage, boolean transactional)
throws AMQException
{
@@ -1054,12 +1054,15 @@ public class AMQChannel implements SessionConfig
private class MessageDeliveryAction implements ServerTransaction.Action
{
+ private boolean _transactional;
private IncomingMessage _incommingMessage;
private ArrayList<? extends BaseQueue> _destinationQueues;
public MessageDeliveryAction(IncomingMessage currentMessage,
- ArrayList<? extends BaseQueue> destinationQueues)
+ ArrayList<? extends BaseQueue> destinationQueues,
+ boolean transactional)
{
+ _transactional = transactional;
_incommingMessage = currentMessage;
_destinationQueues = destinationQueues;
}
@@ -1070,7 +1073,7 @@ public class AMQChannel implements SessionConfig
{
final boolean immediate = _incommingMessage.isImmediate();
- final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
+ final AMQMessage amqMessage = createAMQMessage(_incommingMessage, _transactional);
MessageReference ref = amqMessage.newReference();
for(final BaseQueue queue : _destinationQueues)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
index 8a5559c155..0b5df9b8fa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
@@ -49,6 +49,8 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf
int getConsumerCountHigh();
int getBindingCount();
+
+ int getBindingCountHigh();
ConfigStore getConfigStore();
@@ -57,8 +59,10 @@ public interface QueueConfig extends ConfiguredObject<QueueConfigType, QueueConf
long getTotalEnqueueSize();
long getTotalDequeueSize();
+
+ long getByteTxnEnqueues();
- int getBindingCountHigh();
+ long getMsgTxnEnqueues();
long getPersistentByteEnqueues();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
index e46e951588..5e5dd10e57 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
@@ -46,4 +46,6 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi
Long getTxnRejects();
Long getTxnCount();
+
+ boolean isTransactional();
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index b8a36aba58..e0c181a5fc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -24,11 +24,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.queue.AMQQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
@@ -63,9 +66,16 @@ public class AMQMessage implements ServerMessage
private final StoredMessage<MessageMetaData> _handle;
+ WeakReference<AMQChannel> _channelRef;
+
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
+ this(handle, null);
+ }
+
+ public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
+ {
_handle = handle;
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
@@ -75,6 +85,8 @@ public class AMQMessage implements ServerMessage
{
_flags |= IMMEDIATE;
}
+
+ _channelRef = channelRef;
}
@@ -326,4 +338,9 @@ public class AMQMessage implements ServerMessage
{
return _handle;
}
+
+ public SessionConfig getSessionConfig()
+ {
+ return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index baa5dce14f..08006435f8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -21,9 +21,10 @@
package org.apache.qpid.server.message;
import org.apache.qpid.transport.*;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.transport.ServerSession;
-import java.util.concurrent.atomic.AtomicLong;
import java.nio.ByteBuffer;
import java.lang.ref.WeakReference;
@@ -37,13 +38,11 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
private WeakReference<Session> _sessionRef;
-
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
_storeMessage = storeMessage;
_sessionRef = sessionRef;
-
}
private MessageMetaData_0_10 getMetaData()
@@ -142,5 +141,9 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
return _sessionRef == null ? null : _sessionRef.get();
}
+ public SessionConfig getSessionConfig()
+ {
+ return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 1ac538c15b..2f2d39115f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import org.apache.qpid.server.configuration.SessionConfig;
+
public interface ServerMessage extends EnqueableMessage, MessageContentSource
{
String getRoutingKey();
@@ -44,4 +46,5 @@ public interface ServerMessage extends EnqueableMessage, MessageContentSource
public int getContent(ByteBuffer buf, int offset);
+ SessionConfig getSessionConfig();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index c64b9047de..d99b551936 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -12,6 +12,7 @@ import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -122,6 +123,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
private final AtomicInteger _counsumerCountHigh = new AtomicInteger(0);
+ private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
+ private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -516,7 +519,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
-
+ incrementTxnEnqueueStats(message);
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
@@ -666,6 +669,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
getAtomicQueueCount().incrementAndGet();
}
+
+ private void incrementTxnEnqueueStats(final ServerMessage message)
+ {
+ SessionConfig session = message.getSessionConfig();
+
+ if(session !=null && session.isTransactional())
+ {
+ _msgTxnEnqueues.incrementAndGet();
+ _byteTxnEnqueues.addAndGet(message.getSize());
+ }
+ }
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
@@ -2064,6 +2078,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _dequeueSize.get();
}
+
+ public long getByteTxnEnqueues()
+ {
+ return _byteTxnEnqueues.get();
+ }
+
+ public long getMsgTxnEnqueues()
+ {
+ return _msgTxnEnqueues.get();
+ }
public long getPersistentByteEnqueues()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index a65f3938a6..99c3572a2f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -388,6 +388,14 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
sub.releaseSendLock();
}
}
+
+ public boolean isTransactional()
+ {
+ // this does not look great but there should only be one "non-transactional"
+ // transactional context, while there could be several transactional ones in
+ // theory
+ return !(_transaction instanceof AutoCommitTransaction);
+ }
public void selectTx()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 1a2bcd87c5..b0fbab4146 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -552,4 +552,14 @@ public class MockAMQQueue implements AMQQueue
{
return 0;
}
+
+ public long getByteTxnEnqueues()
+ {
+ return 0;
+ }
+
+ public long getMsgTxnEnqueues()
+ {
+ return 0;
+ }
}