diff options
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java')
-rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java | 64 |
1 files changed, 27 insertions, 37 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java index 3f8afc9a9a..f242111dc5 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java @@ -218,7 +218,15 @@ public class ClientJmsDelegate synchronized(session) { - final Destination destination = session.createQueue(command.getDestinationName()); + final Destination destination; + if(command.isTopic()) + { + destination = session.createTopic(command.getDestinationName()); + } + else + { + destination = session.createQueue(command.getDestinationName()); + } final MessageProducer jmsProducer = session.createProducer(destination); @@ -373,30 +381,6 @@ public class ClientJmsDelegate } } - public void commitOrAcknowledgeMessage(final Message message, final String sessionName) - { - try - { - final Session session = _testSessions.get(sessionName); - if (session.getTransacted()) - { - synchronized(session) - { - session.commit(); - } - } - else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - message.acknowledge(); - } - } - catch (final JMSException jmse) - { - throw new DistributedTestException("Unable to commit or acknowledge message on session: " + - sessionName, jmse); - } - } - public int getAcknowledgeMode(final String sessionName) { try @@ -493,31 +477,36 @@ public class ClientJmsDelegate } } - public void rollbackOrRecover(String sessionName) + public void commitOrAcknowledgeMessageIfNecessary(final String sessionName, final Message message) { try { final Session session = _testSessions.get(sessionName); - synchronized(session) + if (session.getTransacted()) { - if (session.getTransacted()) - { - session.rollback(); - } - else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + synchronized(session) { - session.recover(); + session.commit(); } } + else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + message.acknowledge(); + } } catch (final JMSException jmse) { - throw new DistributedTestException("Unable to rollback or recover on session: " + + throw new DistributedTestException("Unable to commit or acknowledge message on session: " + sessionName, jmse); } } - public void releaseMessage(String sessionName) + public void commitIfNecessary(final String sessionName) + { + commitOrAcknowledgeMessageIfNecessary(sessionName, null); + } + + public void rollbackOrRecoverIfNecessary(String sessionName) { try { @@ -528,7 +517,7 @@ public class ClientJmsDelegate { session.rollback(); } - else + else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { session.recover(); } @@ -536,7 +525,8 @@ public class ClientJmsDelegate } catch (final JMSException jmse) { - LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse); + throw new DistributedTestException("Unable to rollback or recover on session: " + + sessionName, jmse); } } |