diff options
author | Keith Wall <kwall@apache.org> | 2014-08-03 18:05:16 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-08-03 18:05:16 +0000 |
commit | 20beab35b19f246a88ec79828f788e512628252c (patch) | |
tree | df020e25320a3e974b8c01b960cd5c77f13e4327 | |
parent | 6ff0bc6c8a2b191519b3186ab997f9eb9aa309cb (diff) | |
download | qpid-python-20beab35b19f246a88ec79828f788e512628252c.tar.gz |
QPID-5957: [Java Broker] Extend the Session model object to expose transaction start and update times
* Expose the same information on the Connection tab within the Web Management UI.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1615424 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 185 insertions, 23 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java index 686ded9774..8fdd7dcdcc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java @@ -55,5 +55,19 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X> @ManagedStatistic long getUnacknowledgedMessages(); - void delete(); + /** + * Return the time the current transaction started. + * + * @return the time this transaction started or 0 if not in a transaction + */ + @ManagedStatistic + long getTransactionStartTime(); + + /** + * Return the time of the last activity on the current transaction. + * + * @return the time of the last activity or 0 if not in a transaction + */ + @ManagedStatistic + long getTransactionUpdateTime(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index c2ce50764a..9c2759bd1d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -41,9 +41,7 @@ import org.apache.qpid.server.util.Action; final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter> { // Attributes - - - private AMQSessionModel _session; + private final AMQSessionModel _session; private State _state = State.ACTIVE; @@ -175,6 +173,18 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl return _session.getUnacknowledgedMessageCount(); } + @Override + public long getTransactionStartTime() + { + return _session.getTransactionStartTime(); + } + + @Override + public long getTransactionUpdateTime() + { + return _session.getTransactionUpdateTime(); + } + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) private void doDelete() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 0df40bfff6..a9cd32f8f9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -99,4 +99,18 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo void setModelObject(Session<?> session); Session<?> getModelObject(); + + /** + * Return the time the current transaction started. + * + * @return the time this transaction started or 0 if not in a transaction + */ + long getTransactionStartTime(); + + /** + * Return the time of the last activity on the current transaction. + * + * @return the time of the last activity or 0 if not in a transaction + */ + long getTransactionUpdateTime(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 187b2bf569..3fe1515b18 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -1020,6 +1020,34 @@ public class ServerSession extends Session return _modelObject; } + @Override + public long getTransactionStartTime() + { + ServerTransaction serverTransaction = _transaction; + if (serverTransaction.isTransactional()) + { + return serverTransaction.getTransactionStartTime(); + } + else + { + return 0L; + } + } + + @Override + public long getTransactionUpdateTime() + { + ServerTransaction serverTransaction = _transaction; + if (serverTransaction.isTransactional()) + { + return serverTransaction.getTransactionUpdateTime(); + } + else + { + return 0L; + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index ae6d607102..b6e1b7dd6a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1764,4 +1764,32 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { return _modelObject; } + + @Override + public long getTransactionStartTime() + { + ServerTransaction serverTransaction = _transaction; + if (serverTransaction.isTransactional()) + { + return serverTransaction.getTransactionStartTime(); + } + else + { + return 0L; + } + } + + @Override + public long getTransactionUpdateTime() + { + ServerTransaction serverTransaction = _transaction; + if (serverTransaction.isTransactional()) + { + return serverTransaction.getTransactionUpdateTime(); + } + else + { + return 0L; + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index b870eaf630..379dcb01f2 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -95,7 +95,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private static final Logger _logger = Logger.getLogger(Session_1_0.class); private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); private final SessionEndpoint _endpoint; - private VirtualHostImpl _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = @@ -866,6 +865,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio return _modelObject; } + @Override + public long getTransactionStartTime() + { + return 0L; + } + + @Override + public long getTransactionUpdateTime() + { + return 0L; + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js index e2c73a5d41..8836f6a40c 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js @@ -27,9 +27,10 @@ define(["dojo/_base/xhr", "qpid/common/util", "qpid/common/formatter", "qpid/common/UpdatableStore", + "qpid/management/UserPreferences", "dojox/html/entities", "dojo/domReady!"], - function (xhr, parser, query, connect, properties, updater, util, formatter, UpdatableStore, entities) { + function (xhr, parser, query, connect, properties, updater, util, formatter, UpdatableStore, UserPreferences, entities) { function Connection(name, parent, controller) { this.name = name; @@ -105,12 +106,35 @@ define(["dojo/_base/xhr", that.updateHeader(); that.sessionsGrid = new UpdatableStore(that.connectionData.sessions, findNode("sessions"), - [ { name: "Name", field: "name", width: "70px"}, - { name: "Mode", field: "distributionMode", width: "70px"}, - { name: "Msgs Rate", field: "msgRate", - width: "150px"}, - { name: "Bytes Rate", field: "bytesRate", - width: "100%"} + [ { name: "Name", field: "name", width: "70px"}, + { name: "Consumers", field: "consumerCount", width: "90px"}, + { name: "Unacknowledged messages", field: "unacknowledgedMessages", width: "110px"}, + { name: "Current store transaction start", field: "transactionStartTime", width: "200px", + formatter: function (transactionStartTime) + { + if (transactionStartTime > 0) + { + return UserPreferences.formatDateTime(transactionStartTime, {selector: "time", addOffset: true, appendTimeZone: true}); + } + else + { + return "N/A"; + } + } + }, + { name: "Current store transaction update", field: "transactionUpdateTime", width: "100%", + formatter: function (transactionUpdateTime) + { + if (transactionUpdateTime > 0) + { + return UserPreferences.formatDateTime(transactionUpdateTime, {selector: "time", addOffset: true, appendTimeZone: true}); + } + else + { + return "N/A"; + } + } + } ]); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java index 5be3be615f..ad4a370a04 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java @@ -32,6 +32,8 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.servlet.http.HttpServletResponse; +import org.junit.Assert; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.server.model.ConfiguredObject; @@ -88,6 +90,8 @@ public class ConnectionRestTest extends QpidRestTestCase m = consumer.receive(1000l); assertNotNull("Message was not received after rollback", m); } + + // Session left open } public void testGetAllConnections() throws Exception @@ -175,6 +179,34 @@ public class ConnectionRestTest extends QpidRestTestCase assertSession(sessions.get(0), (AMQSession<?, ?>) _session); } + public void testProducerSessionOpenHasTransactionStartAndUpdateTimes() throws Exception + { + Destination queue = _session.createQueue(getTestQueueName()); + MessageProducer producer = _session.createProducer(queue); + producer.send(_session.createMessage()); + // session left open + + String connectionName = getConnectionName(); + + List<Map<String, Object>> sessions = getRestTestHelper().getJsonAsList("session/test/test/" + + URLDecoder.decode(connectionName, "UTF-8") + "/" + ((AMQSession<?, ?>) _session).getChannelId()); + assertEquals("Unexpected number of sessions", 1, sessions.size()); + + final Map<String, Object> sessionData = sessions.get(0); + + @SuppressWarnings("unchecked") + Map<String, Object> statistics = (Map<String, Object>) sessionData.get(Asserts.STATISTICS_ATTRIBUTE); + + long transactionStartTime = ((Number) statistics.get("transactionStartTime")).longValue(); + long transactionUpdateTime = ((Number) statistics.get("transactionUpdateTime")).longValue(); + + assertTrue("Unexpected transaction start value for open transaction " + transactionStartTime, transactionStartTime > 0); + assertTrue("Unexpected transaction update value for open transaction " + transactionUpdateTime, transactionUpdateTime > 0); + assertTrue("Expected transaction update value " + transactionUpdateTime + " to be greater than transaction start time " + transactionStartTime, transactionUpdateTime >= transactionStartTime); + + + } + private void assertConnection(Map<String, Object> connectionDetails) throws JMSException { Asserts.assertConnection(connectionDetails, (AMQConnection) _connection); @@ -212,27 +244,28 @@ public class ConnectionRestTest extends QpidRestTestCase Session.STATE, Session.DURABLE, Session.LIFETIME_POLICY); - assertEquals("Unexpecte value of attribute " + Session.NAME, session.getChannelId() + "", + assertEquals("Unexpected value of attribute " + Session.NAME, session.getChannelId() + "", sessionData.get(Session.NAME)); - assertEquals("Unexpecte value of attribute " + Session.PRODUCER_FLOW_BLOCKED, Boolean.FALSE, + assertEquals("Unexpected value of attribute " + Session.PRODUCER_FLOW_BLOCKED, Boolean.FALSE, sessionData.get(Session.PRODUCER_FLOW_BLOCKED)); - assertEquals("Unexpecte value of attribute " + Session.CHANNEL_ID, session.getChannelId(), + assertEquals("Unexpected value of attribute " + Session.CHANNEL_ID, session.getChannelId(), sessionData.get(Session.CHANNEL_ID)); @SuppressWarnings("unchecked") Map<String, Object> statistics = (Map<String, Object>) sessionData.get(Asserts.STATISTICS_ATTRIBUTE); Asserts.assertAttributesPresent(statistics, "consumerCount", "localTransactionBegins", "localTransactionOpen", - "localTransactionRollbacks", "unacknowledgedMessages"); + "localTransactionRollbacks", "unacknowledgedMessages", + "transactionStartTime", "transactionUpdateTime"); - assertEquals("Unexpecte value of statistic attribute " + "unacknowledgedMessages", MESSAGE_NUMBER - 1, + assertEquals("Unexpected value of statistic attribute " + "unacknowledgedMessages", MESSAGE_NUMBER - 1, statistics.get("unacknowledgedMessages")); - assertEquals("Unexpecte value of statistic attribute " + "localTransactionBegins", 4, - statistics.get("localTransactionBegins")); - assertEquals("Unexpecte value of statistic attribute " + "localTransactionRollbacks", 1, - statistics.get("localTransactionRollbacks")); - assertEquals("Unexpecte value of statistic attribute " + "consumerCount", 1, - statistics.get("consumerCount")); + assertEquals("Unexpected value of statistic attribute " + "localTransactionBegins", 4, + statistics.get("localTransactionBegins")); + assertEquals("Unexpected value of statistic attribute " + "localTransactionRollbacks", 1, + statistics.get("localTransactionRollbacks")); + assertEquals("Unexpected value of statistic attribute " + "consumerCount", 1, + statistics.get("consumerCount")); } private String getConnectionName() throws IOException |