summaryrefslogtreecommitdiff
path: root/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java64
1 files changed, 27 insertions, 37 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
index 3f8afc9a9a..f242111dc5 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
@@ -218,7 +218,15 @@ public class ClientJmsDelegate
synchronized(session)
{
- final Destination destination = session.createQueue(command.getDestinationName());
+ final Destination destination;
+ if(command.isTopic())
+ {
+ destination = session.createTopic(command.getDestinationName());
+ }
+ else
+ {
+ destination = session.createQueue(command.getDestinationName());
+ }
final MessageProducer jmsProducer = session.createProducer(destination);
@@ -373,30 +381,6 @@ public class ClientJmsDelegate
}
}
- public void commitOrAcknowledgeMessage(final Message message, final String sessionName)
- {
- try
- {
- final Session session = _testSessions.get(sessionName);
- if (session.getTransacted())
- {
- synchronized(session)
- {
- session.commit();
- }
- }
- else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- message.acknowledge();
- }
- }
- catch (final JMSException jmse)
- {
- throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
- sessionName, jmse);
- }
- }
-
public int getAcknowledgeMode(final String sessionName)
{
try
@@ -493,31 +477,36 @@ public class ClientJmsDelegate
}
}
- public void rollbackOrRecover(String sessionName)
+ public void commitOrAcknowledgeMessageIfNecessary(final String sessionName, final Message message)
{
try
{
final Session session = _testSessions.get(sessionName);
- synchronized(session)
+ if (session.getTransacted())
{
- if (session.getTransacted())
- {
- session.rollback();
- }
- else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ synchronized(session)
{
- session.recover();
+ session.commit();
}
}
+ else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ message.acknowledge();
+ }
}
catch (final JMSException jmse)
{
- throw new DistributedTestException("Unable to rollback or recover on session: " +
+ throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
sessionName, jmse);
}
}
- public void releaseMessage(String sessionName)
+ public void commitIfNecessary(final String sessionName)
+ {
+ commitOrAcknowledgeMessageIfNecessary(sessionName, null);
+ }
+
+ public void rollbackOrRecoverIfNecessary(String sessionName)
{
try
{
@@ -528,7 +517,7 @@ public class ClientJmsDelegate
{
session.rollback();
}
- else
+ else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
session.recover();
}
@@ -536,7 +525,8 @@ public class ClientJmsDelegate
}
catch (final JMSException jmse)
{
- LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse);
+ throw new DistributedTestException("Unable to rollback or recover on session: " +
+ sessionName, jmse);
}
}