summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-08-03 18:05:16 +0000
committerKeith Wall <kwall@apache.org>2014-08-03 18:05:16 +0000
commit20beab35b19f246a88ec79828f788e512628252c (patch)
treedf020e25320a3e974b8c01b960cd5c77f13e4327
parent6ff0bc6c8a2b191519b3186ab997f9eb9aa309cb (diff)
downloadqpid-python-20beab35b19f246a88ec79828f788e512628252c.tar.gz
QPID-5957: [Java Broker] Extend the Session model object to expose transaction start and update times
* Expose the same information on the Connection tab within the Web Management UI. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1615424 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java28
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java28
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java13
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js38
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java55
8 files changed, 185 insertions, 23 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 686ded9774..8fdd7dcdcc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -55,5 +55,19 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getUnacknowledgedMessages();
- void delete();
+ /**
+ * Return the time the current transaction started.
+ *
+ * @return the time this transaction started or 0 if not in a transaction
+ */
+ @ManagedStatistic
+ long getTransactionStartTime();
+
+ /**
+ * Return the time of the last activity on the current transaction.
+ *
+ * @return the time of the last activity or 0 if not in a transaction
+ */
+ @ManagedStatistic
+ long getTransactionUpdateTime();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index c2ce50764a..9c2759bd1d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -41,9 +41,7 @@ import org.apache.qpid.server.util.Action;
final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
{
// Attributes
-
-
- private AMQSessionModel _session;
+ private final AMQSessionModel _session;
private State _state = State.ACTIVE;
@@ -175,6 +173,18 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
return _session.getUnacknowledgedMessageCount();
}
+ @Override
+ public long getTransactionStartTime()
+ {
+ return _session.getTransactionStartTime();
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return _session.getTransactionUpdateTime();
+ }
+
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
private void doDelete()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 0df40bfff6..a9cd32f8f9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -99,4 +99,18 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
void setModelObject(Session<?> session);
Session<?> getModelObject();
+
+ /**
+ * Return the time the current transaction started.
+ *
+ * @return the time this transaction started or 0 if not in a transaction
+ */
+ long getTransactionStartTime();
+
+ /**
+ * Return the time of the last activity on the current transaction.
+ *
+ * @return the time of the last activity or 0 if not in a transaction
+ */
+ long getTransactionUpdateTime();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 187b2bf569..3fe1515b18 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1020,6 +1020,34 @@ public class ServerSession extends Session
return _modelObject;
}
+ @Override
+ public long getTransactionStartTime()
+ {
+ ServerTransaction serverTransaction = _transaction;
+ if (serverTransaction.isTransactional())
+ {
+ return serverTransaction.getTransactionStartTime();
+ }
+ else
+ {
+ return 0L;
+ }
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ ServerTransaction serverTransaction = _transaction;
+ if (serverTransaction.isTransactional())
+ {
+ return serverTransaction.getTransactionUpdateTime();
+ }
+ else
+ {
+ return 0L;
+ }
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index ae6d607102..b6e1b7dd6a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1764,4 +1764,32 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
return _modelObject;
}
+
+ @Override
+ public long getTransactionStartTime()
+ {
+ ServerTransaction serverTransaction = _transaction;
+ if (serverTransaction.isTransactional())
+ {
+ return serverTransaction.getTransactionStartTime();
+ }
+ else
+ {
+ return 0L;
+ }
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ ServerTransaction serverTransaction = _transaction;
+ if (serverTransaction.isTransactional())
+ {
+ return serverTransaction.getTransactionUpdateTime();
+ }
+ else
+ {
+ return 0L;
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index b870eaf630..379dcb01f2 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -95,7 +95,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private static final Logger _logger = Logger.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private final SessionEndpoint _endpoint;
- private VirtualHostImpl _vhost;
private AutoCommitTransaction _transaction;
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
@@ -866,6 +865,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
return _modelObject;
}
+ @Override
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
index e2c73a5d41..8836f6a40c 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
@@ -27,9 +27,10 @@ define(["dojo/_base/xhr",
"qpid/common/util",
"qpid/common/formatter",
"qpid/common/UpdatableStore",
+ "qpid/management/UserPreferences",
"dojox/html/entities",
"dojo/domReady!"],
- function (xhr, parser, query, connect, properties, updater, util, formatter, UpdatableStore, entities) {
+ function (xhr, parser, query, connect, properties, updater, util, formatter, UpdatableStore, UserPreferences, entities) {
function Connection(name, parent, controller) {
this.name = name;
@@ -105,12 +106,35 @@ define(["dojo/_base/xhr",
that.updateHeader();
that.sessionsGrid = new UpdatableStore(that.connectionData.sessions, findNode("sessions"),
- [ { name: "Name", field: "name", width: "70px"},
- { name: "Mode", field: "distributionMode", width: "70px"},
- { name: "Msgs Rate", field: "msgRate",
- width: "150px"},
- { name: "Bytes Rate", field: "bytesRate",
- width: "100%"}
+ [ { name: "Name", field: "name", width: "70px"},
+ { name: "Consumers", field: "consumerCount", width: "90px"},
+ { name: "Unacknowledged messages", field: "unacknowledgedMessages", width: "110px"},
+ { name: "Current store transaction start", field: "transactionStartTime", width: "200px",
+ formatter: function (transactionStartTime)
+ {
+ if (transactionStartTime > 0)
+ {
+ return UserPreferences.formatDateTime(transactionStartTime, {selector: "time", addOffset: true, appendTimeZone: true});
+ }
+ else
+ {
+ return "N/A";
+ }
+ }
+ },
+ { name: "Current store transaction update", field: "transactionUpdateTime", width: "100%",
+ formatter: function (transactionUpdateTime)
+ {
+ if (transactionUpdateTime > 0)
+ {
+ return UserPreferences.formatDateTime(transactionUpdateTime, {selector: "time", addOffset: true, appendTimeZone: true});
+ }
+ else
+ {
+ return "N/A";
+ }
+ }
+ }
]);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
index 5be3be615f..ad4a370a04 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
@@ -32,6 +32,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.servlet.http.HttpServletResponse;
+import org.junit.Assert;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -88,6 +90,8 @@ public class ConnectionRestTest extends QpidRestTestCase
m = consumer.receive(1000l);
assertNotNull("Message was not received after rollback", m);
}
+
+ // Session left open
}
public void testGetAllConnections() throws Exception
@@ -175,6 +179,34 @@ public class ConnectionRestTest extends QpidRestTestCase
assertSession(sessions.get(0), (AMQSession<?, ?>) _session);
}
+ public void testProducerSessionOpenHasTransactionStartAndUpdateTimes() throws Exception
+ {
+ Destination queue = _session.createQueue(getTestQueueName());
+ MessageProducer producer = _session.createProducer(queue);
+ producer.send(_session.createMessage());
+ // session left open
+
+ String connectionName = getConnectionName();
+
+ List<Map<String, Object>> sessions = getRestTestHelper().getJsonAsList("session/test/test/"
+ + URLDecoder.decode(connectionName, "UTF-8") + "/" + ((AMQSession<?, ?>) _session).getChannelId());
+ assertEquals("Unexpected number of sessions", 1, sessions.size());
+
+ final Map<String, Object> sessionData = sessions.get(0);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> statistics = (Map<String, Object>) sessionData.get(Asserts.STATISTICS_ATTRIBUTE);
+
+ long transactionStartTime = ((Number) statistics.get("transactionStartTime")).longValue();
+ long transactionUpdateTime = ((Number) statistics.get("transactionUpdateTime")).longValue();
+
+ assertTrue("Unexpected transaction start value for open transaction " + transactionStartTime, transactionStartTime > 0);
+ assertTrue("Unexpected transaction update value for open transaction " + transactionUpdateTime, transactionUpdateTime > 0);
+ assertTrue("Expected transaction update value " + transactionUpdateTime + " to be greater than transaction start time " + transactionStartTime, transactionUpdateTime >= transactionStartTime);
+
+
+ }
+
private void assertConnection(Map<String, Object> connectionDetails) throws JMSException
{
Asserts.assertConnection(connectionDetails, (AMQConnection) _connection);
@@ -212,27 +244,28 @@ public class ConnectionRestTest extends QpidRestTestCase
Session.STATE,
Session.DURABLE,
Session.LIFETIME_POLICY);
- assertEquals("Unexpecte value of attribute " + Session.NAME, session.getChannelId() + "",
+ assertEquals("Unexpected value of attribute " + Session.NAME, session.getChannelId() + "",
sessionData.get(Session.NAME));
- assertEquals("Unexpecte value of attribute " + Session.PRODUCER_FLOW_BLOCKED, Boolean.FALSE,
+ assertEquals("Unexpected value of attribute " + Session.PRODUCER_FLOW_BLOCKED, Boolean.FALSE,
sessionData.get(Session.PRODUCER_FLOW_BLOCKED));
- assertEquals("Unexpecte value of attribute " + Session.CHANNEL_ID, session.getChannelId(),
+ assertEquals("Unexpected value of attribute " + Session.CHANNEL_ID, session.getChannelId(),
sessionData.get(Session.CHANNEL_ID));
@SuppressWarnings("unchecked")
Map<String, Object> statistics = (Map<String, Object>) sessionData.get(Asserts.STATISTICS_ATTRIBUTE);
Asserts.assertAttributesPresent(statistics, "consumerCount",
"localTransactionBegins", "localTransactionOpen",
- "localTransactionRollbacks", "unacknowledgedMessages");
+ "localTransactionRollbacks", "unacknowledgedMessages",
+ "transactionStartTime", "transactionUpdateTime");
- assertEquals("Unexpecte value of statistic attribute " + "unacknowledgedMessages", MESSAGE_NUMBER - 1,
+ assertEquals("Unexpected value of statistic attribute " + "unacknowledgedMessages", MESSAGE_NUMBER - 1,
statistics.get("unacknowledgedMessages"));
- assertEquals("Unexpecte value of statistic attribute " + "localTransactionBegins", 4,
- statistics.get("localTransactionBegins"));
- assertEquals("Unexpecte value of statistic attribute " + "localTransactionRollbacks", 1,
- statistics.get("localTransactionRollbacks"));
- assertEquals("Unexpecte value of statistic attribute " + "consumerCount", 1,
- statistics.get("consumerCount"));
+ assertEquals("Unexpected value of statistic attribute " + "localTransactionBegins", 4,
+ statistics.get("localTransactionBegins"));
+ assertEquals("Unexpected value of statistic attribute " + "localTransactionRollbacks", 1,
+ statistics.get("localTransactionRollbacks"));
+ assertEquals("Unexpected value of statistic attribute " + "consumerCount", 1,
+ statistics.get("consumerCount"));
}
private String getConnectionName() throws IOException