summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/test/java/org/apache/qpid/test/utils
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/test/utils')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/utils/BrokerCommandHelperTest.java79
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java484
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java94
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java288
4 files changed, 945 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/BrokerCommandHelperTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/BrokerCommandHelperTest.java
new file mode 100644
index 0000000000..83c2f1e58d
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/BrokerCommandHelperTest.java
@@ -0,0 +1,79 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.utils;
+
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+
+public class BrokerCommandHelperTest extends QpidTestCase
+{
+ private static final String PATH_TO_QPID_EXECUTABLE = "/path / to (/qpid";
+ private static final String ARGUMENT_WITH_SPACES = " blah / blah /blah";
+ private static final String ARGUMENT_PORT = "-p";
+ private static final String ARGUMENT_PORT_VALUE = "@PORT";
+ private static final String ARGUMENT_STORE_PATH = "-sp";
+ private static final String ARGUMENT_STORE_PATH_VALUE = "@STORE_PATH";
+ private static final String ARGUMENT_STORE_TYPE = "-st";
+ private static final String ARGUMENT_STORE_TYPE_VALUE = "@STORE_TYPE";
+ private static final String ARGUMENT_LOG = "-l";
+ private static final String ARGUMENT_LOG_VALUE = "@LOG_CONFIG_FILE";
+
+ private BrokerCommandHelper _brokerCommandHelper;
+
+ private File _logConfigFile = mock(File.class);
+
+ @Override
+ public void setUp()
+ {
+ when(_logConfigFile.getAbsolutePath()).thenReturn("log Config File");
+ _brokerCommandHelper = new BrokerCommandHelper("\"" + PATH_TO_QPID_EXECUTABLE + "\" " + ARGUMENT_PORT + " "
+ + ARGUMENT_PORT_VALUE + " " + ARGUMENT_STORE_PATH + " " + ARGUMENT_STORE_PATH_VALUE + " " + ARGUMENT_STORE_TYPE
+ + " " + ARGUMENT_STORE_TYPE_VALUE + " " + ARGUMENT_LOG + " " + ARGUMENT_LOG_VALUE + " '" + ARGUMENT_WITH_SPACES
+ + "'");
+ }
+
+ public void testGetBrokerCommand()
+ {
+ String[] brokerCommand = _brokerCommandHelper.getBrokerCommand(1, "path to config file", "json", _logConfigFile);
+
+ String[] expected = { PATH_TO_QPID_EXECUTABLE, ARGUMENT_PORT, "1", ARGUMENT_STORE_PATH, "path to config file",
+ ARGUMENT_STORE_TYPE, "json", ARGUMENT_LOG, "\"log Config File\"", ARGUMENT_WITH_SPACES };
+ assertEquals("Unexpected broker command", expected.length, brokerCommand.length);
+ for (int i = 0; i < expected.length; i++)
+ {
+ assertEquals("Unexpected command part value at " + i,expected[i], brokerCommand[i] );
+ }
+ }
+
+ public void testRemoveBrokerCommandLog4JFile()
+ {
+ _brokerCommandHelper.removeBrokerCommandLog4JFile();
+ String[] brokerCommand = _brokerCommandHelper.getBrokerCommand(1, "configFile", "json", _logConfigFile);
+
+ String[] expected = { PATH_TO_QPID_EXECUTABLE, ARGUMENT_PORT, "1", ARGUMENT_STORE_PATH, "configFile",
+ ARGUMENT_STORE_TYPE, "json", ARGUMENT_WITH_SPACES };
+
+ assertEquals("Unexpected broker command", expected.length, brokerCommand.length);
+ for (int i = 0; i < expected.length; i++)
+ {
+ assertEquals("Unexpected command part value at " + i,expected[i], brokerCommand[i] );
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java
new file mode 100644
index 0000000000..3a9354d822
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/ConversationFactory.java
@@ -0,0 +1,484 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(ConversationFactory.class);
+
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the controlSession over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ private MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ private MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ private Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ private Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ private BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ private AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination
+ + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called");
+
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ log.debug("public Conversation startConversation(): called");
+
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ log.debug("public Collection<Message> emptyDeadLetterBox(): called");
+
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the controlSession over which the conversation is conducted.
+ *
+ * @return The controlSession over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ private long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ private Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = "
+ + message.getJMSMessageID() + "): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ log.debug("public Message receive(): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned. At least one of the message count or timeout should be set to a value of
+ * 1 or greater.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout
+ + "): called");
+
+ // Check that a timeout or message count was set.
+ if ((num < 1) && (timeout < 1))
+ {
+ throw new IllegalArgumentException("At least one of message count (num) or timeout must be set.");
+ }
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ // Used to collect the received messages in.
+ Collection<Message> result = new ArrayList<Message>();
+
+ // Used to indicate when the timeout or message count has expired.
+ boolean receiveMore = true;
+
+ int messageCount = 0;
+
+ // Receive messages until the timeout or message count expires.
+ do
+ {
+ try
+ {
+ Message next = null;
+
+ // Try to receive the message with a timeout if one has been set.
+ if (timeout > 0)
+ {
+ next = queue.poll(timeout, TimeUnit.MILLISECONDS);
+
+ // Check if the timeout expired, and stop receiving if so.
+ if (next == null)
+ {
+ receiveMore = false;
+ }
+ }
+ // Receive the message without a timeout.
+ else
+ {
+ next = queue.take();
+ }
+
+ // Increment the message count if a message was received.
+ messageCount += (next != null) ? 1 : 0;
+
+ // Check if all the requested messages were received, and stop receiving if so.
+ if ((num > 0) && (messageCount >= num))
+ {
+ receiveMore = false;
+ }
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination;
+
+ if (next != null)
+ {
+ result.add(next);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the threads interrupted status.
+ Thread.currentThread().interrupt();
+
+ // Stop receiving but return the messages received so far.
+ receiveMore = false;
+ }
+ }
+ while (receiveMore);
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ log.debug("public void end(): called");
+
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
new file mode 100644
index 0000000000..f6c481431a
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.util.FileUtils;
+
+import javax.naming.NamingException;
+
+public class FailoverBaseCase extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
+
+ public static final long DEFAULT_FAILOVER_TIME = 10000L;
+
+ protected void setUp() throws java.lang.Exception
+ {
+ super.setUp();
+ startBroker(getFailingPort());
+ }
+
+ /**
+ * We are using failover factories
+ *
+ * @return a connection
+ * @throws Exception
+ */
+ @Override
+ public AMQConnectionFactory getConnectionFactory() throws NamingException
+ {
+ _logger.info("get ConnectionFactory");
+ if (_connectionFactory == null)
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ _connectionFactory = getConnectionFactory("failover.ssl");
+ }
+ else
+ {
+ _connectionFactory = getConnectionFactory("failover");
+ }
+ }
+ return _connectionFactory;
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ // Ensure we shutdown any secondary brokers, even if we are unable
+ // to cleanly tearDown the QTC.
+ stopBroker(getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
+ }
+ }
+
+ public void failBroker(int port)
+ {
+ try
+ {
+ //TODO: use killBroker instead
+ stopBroker(port);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java
new file mode 100644
index 0000000000..0e0032da64
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection extends QpidBrokerTestCase implements ExceptionListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(QpidClientConnection.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+
+ public Connection getConnection()
+ {
+ return connection;
+ }
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = getConnection("guest", "guest") ;
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (Exception e)
+ {
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void disconnect() throws Exception
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}