summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/old_test/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/old_test/java/org/apache')
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java185
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java213
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java212
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/README.txt11
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java129
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java277
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java133
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/codec/Server.java103
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java35
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/config/AbstractConfig.java69
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java29
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/config/Connector.java40
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectorConfig.java28
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java117
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java112
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java196
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java167
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java117
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/headers/MessageFactory.java175
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java133
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Bind.java273
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Lookup.java196
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Unbind.java166
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java153
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java102
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java93
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java271
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java269
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java176
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java122
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java95
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java153
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/example.properties38
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/topic/Config.java243
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/topic/Listener.java141
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java155
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/topic/Publisher.java175
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java110
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java45
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java45
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java127
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java44
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java151
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java185
44 files changed, 6009 insertions, 0 deletions
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
new file mode 100644
index 0000000000..2c08f1e34a
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.File;
+import java.util.Hashtable;
+
+public class JNDIBindConnectionFactory
+{
+
+ public static final String CONNECTION_FACTORY_BINDING = "amq.ConnectionFactory";
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
+ public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
+ public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+ public static final String DEFAULT_CONNECTION_URL = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
+
+ private static void printUsage()
+ {
+ System.out.println("Using default values: Usage:java JNDIBindConnectionFactory <connection url> [<Connection Factory Binding>] [<Provider URL>] [<JNDI Context Factory>]");
+
+ }
+
+ public static void main(String[] args)
+ {
+ Logger.getRootLogger().setLevel(Level.OFF);
+
+ String connectionFactoryBinding = CONNECTION_FACTORY_BINDING;
+ String provider = PROVIDER_URL;
+ String contextFactory = FSCONTEXT_FACTORY;
+ if (args.length == 0)
+ {
+ printUsage();
+ System.exit(1);
+ }
+
+ String connectionURL = args[0];
+
+ System.out.println("Using Connection:" + connectionURL + "\n");
+
+
+ if (args.length > 1)
+ {
+ connectionFactoryBinding = args[1];
+
+ if (args.length > 2)
+ {
+ provider = args[2];
+
+ if (args.length > 3)
+ {
+ contextFactory = args[3];
+ }
+ }
+ else
+ {
+ System.out.println("Using default File System Context Factory");
+ System.out.println("Using default Connection Factory Binding:" + connectionFactoryBinding);
+ }
+ }
+ else
+ {
+ printUsage();
+ }
+
+
+ System.out.println("File System Context Factory\n" +
+ "Connection:" + connectionURL + "\n" +
+ "Connection Factory Binding:" + connectionFactoryBinding + "\n" +
+ "JNDI Provider URL:" + provider);
+
+ if (provider.startsWith("file"))
+ {
+ File file = new File(provider.substring(provider.indexOf("://") + 3));
+
+ if (file.exists() && !file.isDirectory())
+ {
+ System.out.println("Couldn't make directory file already exists");
+ System.exit(1);
+ }
+ else
+ {
+ if (!file.exists())
+ {
+ if (!file.mkdirs())
+ {
+ System.out.println("Couldn't make directory");
+ System.exit(1);
+ }
+ }
+ }
+ }
+
+ new JNDIBindConnectionFactory(provider, connectionFactoryBinding, contextFactory, connectionURL);
+
+ }
+
+ public JNDIBindConnectionFactory(String provider, String binding, String contextFactory, String CONNECTION_URL)
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+ env.put(Context.PROVIDER_URL, provider);
+
+ try
+ {
+ // Create the initial context
+ Context ctx = new InitialContext(env);
+
+ // Create the object to be bound
+ ConnectionFactory factory = null;
+
+ try
+ {
+ factory = new AMQConnectionFactory(CONNECTION_URL);
+
+
+ try
+ {
+ Object obj = ctx.lookup(binding);
+
+ if (obj != null)
+ {
+ System.out.println("Un-binding previous Connection Factory");
+ ctx.unbind(binding);
+ }
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+
+ // Perform the bind
+ ctx.bind(binding, factory);
+ System.out.println("Bound Connection Factory:" + binding);
+
+ // Check that it is bound
+ Object obj = ctx.lookup(binding);
+ System.out.println("Connection URL:" + ((AMQConnectionFactory) obj).getConnectionURL());
+
+ System.out.println("JNDI FS Context:" + provider);
+ }
+ catch (NamingException amqe)
+ {
+ System.out.println("Operation failed: " + amqe);
+ }
+ catch (URLSyntaxException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
new file mode 100644
index 0000000000..10e8b94311
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.File;
+import java.util.Hashtable;
+
+public class JNDIBindQueue
+{
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
+ public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
+ public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+
+ Connection _connection = null;
+ Context _ctx = null;
+
+
+ public JNDIBindQueue(String queueBinding, String queueName, String provider, String contextFactory)
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+ env.put(Context.PROVIDER_URL, provider);
+
+ try
+ {
+ // Create the initial context
+ _ctx = new InitialContext(env);
+
+ // Create the object to be bound
+
+ try
+ {
+ _connection = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'");
+ System.out.println("Connected");
+ }
+ catch (Exception amqe)
+ {
+ System.out.println("Unable to create AMQConnectionFactory:" + amqe);
+ }
+
+ if (_connection != null)
+ {
+ bindQueue(queueName, queueBinding);
+ }
+
+ // Check that it is bound
+ Object obj = _ctx.lookup(queueBinding);
+
+ System.out.println("Bound Queue:" + ((AMQQueue) obj).toURL());
+
+ System.out.println("JNDI FS Context:" + provider);
+
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ finally
+ {
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (JMSException closeE)
+ {
+ System.out.println("Connection closing failed: " + closeE);
+ }
+ }
+
+
+ }
+
+
+ private void bindQueue(String queueName, String queueBinding) throws NamingException
+ {
+
+ try
+ {
+ Object obj = _ctx.lookup(queueBinding);
+
+ if (obj != null)
+ {
+ System.out.println("Un-binding exisiting object");
+ _ctx.unbind(queueBinding);
+ }
+ }
+ catch (NamingException e)
+ {
+
+ }
+
+ Queue queue = null;
+ try
+ {
+
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (session != null)
+ {
+ queue = ((AMQSession) session).createQueue(queueName);
+ }
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Unable to create Queue:" + jmse);
+ }
+
+ // Perform the bind
+ _ctx.bind(queueBinding, queue);
+ }
+
+
+ public static void main(String[] args)
+ {
+ Logger.getRootLogger().setLevel(Level.OFF);
+
+ String provider = JNDIBindQueue.PROVIDER_URL;
+ String contextFactory = JNDIBindQueue.FSCONTEXT_FACTORY;
+
+ if (args.length > 1)
+ {
+ String binding = args[0];
+ String queueName = args[1];
+
+ if (args.length > 2)
+ {
+ provider = args[2];
+
+ if (args.length > 3)
+ {
+ contextFactory = args[3];
+ }
+ }
+ else
+ {
+ System.out.println("Using default File System Context Factory");
+ }
+
+ System.out.println("File System Context Factory\n" +
+ "Binding Queue:'" + queueName + "' to '" + binding + "'\n" +
+ "JNDI Provider URL:" + provider);
+
+ if (provider.startsWith("file"))
+ {
+ File file = new File(provider.substring(provider.indexOf("://") + 3));
+
+ if (file.exists() && !file.isDirectory())
+ {
+ System.out.println("Couldn't make directory file already exists");
+ System.exit(1);
+ }
+ else
+ {
+ if (!file.exists())
+ {
+ if (!file.mkdirs())
+ {
+ System.out.println("Couldn't make directory");
+ System.exit(1);
+ }
+ }
+ }
+ }
+
+
+ new JNDIBindQueue(binding, queueName, provider, contextFactory);
+
+ }
+ else
+ {
+ System.out.println("Using Defaults: Usage:java JNDIBindQueue <Binding> <queue name> [<Provider URL> [<JNDI Context Factory>]]");
+ }
+
+ }
+
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
new file mode 100644
index 0000000000..ca071c1187
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.File;
+import java.util.Hashtable;
+
+public class JNDIBindTopic
+{
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
+ public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
+
+ public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+
+ Connection _connection = null;
+ Context _ctx = null;
+
+
+ public JNDIBindTopic(String topicBinding, String topicName, String provider, String contextFactory)
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+ env.put(Context.PROVIDER_URL, provider);
+
+ try
+ {
+ // Create the initial context
+ _ctx = new InitialContext(env);
+
+ // Create the object to be bound
+
+ try
+ {
+ _connection = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'");
+ System.out.println("Connected");
+ }
+ catch (Exception amqe)
+ {
+ System.out.println("Unable to create AMQConnectionFactory:" + amqe);
+ }
+
+ if (_connection != null)
+ {
+ bindTopic(topicName, topicBinding);
+ }
+
+ // Check that it is bound
+ Object obj = _ctx.lookup(topicBinding);
+
+ System.out.println("Bound Queue:" + ((AMQTopic) obj).toURL());
+
+ System.out.println("JNDI FS Context:" + provider);
+
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ finally
+ {
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ catch (JMSException closeE)
+ {
+ System.out.println("Operation failed: " + closeE);
+ }
+ }
+ }
+
+
+ private void bindTopic(String topicName, String topicBinding) throws NamingException
+ {
+
+ try
+ {
+ Object obj = _ctx.lookup(topicBinding);
+
+ if (obj != null)
+ {
+ System.out.println("Un-binding exisiting object");
+ _ctx.unbind(topicBinding);
+ }
+ }
+ catch (NamingException e)
+ {
+
+ }
+
+ Topic topic = null;
+ try
+ {
+
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (session != null)
+ {
+ topic = ((AMQSession) session).createTopic(topicName);
+ }
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Unable to create Topic:" + jmse);
+ }
+
+ // Perform the bind
+ _ctx.bind(topicBinding, topic);
+ }
+
+
+ public static void main(String[] args)
+ {
+ Logger.getRootLogger().setLevel(Level.OFF);
+
+ String provider = JNDIBindTopic.PROVIDER_URL;
+ String contextFactory = JNDIBindTopic.FSCONTEXT_FACTORY;
+
+ if (args.length > 1)
+ {
+ String binding = args[0];
+ String queueName = args[1];
+
+ if (args.length > 2)
+ {
+ provider = args[2];
+
+ if (args.length > 3)
+ {
+ contextFactory = args[3];
+ }
+ }
+ else
+ {
+ System.out.println("Using default File System Context Factory");
+ }
+
+ System.out.println("File System Context Factory\n" +
+ "Binding Topic:'" + queueName + "' to '" + binding + "'\n" +
+ "JNDI Provider URL:" + provider);
+
+
+ if (provider.startsWith("file"))
+ {
+ File file = new File(provider.substring(provider.indexOf("://") + 3));
+
+ if (file.exists() && !file.isDirectory())
+ {
+ System.out.println("Couldn't make directory file already exists");
+ System.exit(1);
+ }
+ else
+ {
+ if (!file.exists())
+ {
+ if (!file.mkdirs())
+ {
+ System.out.println("Couldn't make directory");
+ System.exit(1);
+ }
+ }
+ }
+ }
+
+ new JNDIBindTopic(binding, queueName, provider, contextFactory);
+
+ }
+ else
+ {
+ System.out.println("Usage:java JNDIBindTopic <Binding> <topic name> [<Provider URL> [<JNDI Context Factory>]]");
+ }
+
+ }
+
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/README.txt b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/README.txt
new file mode 100644
index 0000000000..95ee9f9c77
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/README.txt
@@ -0,0 +1,11 @@
+These JNDI setup tools are mainly for use in conjunction with the IBM JMS Performance Harness available here:
+The jar should be placed in the client/test/lib/ directory.
+
+http://www.alphaworks.ibm.com/tech/perfharness
+
+
+These JNDI classes use the the SUN FileSystem context.
+There are two jar files that should be placed in your client/test/lib directory.
+
+http://javashoplm.sun.com/ECom/docs/Welcome.jsp?StoreId=22&PartDetailId=7110-jndi-1.2.1-oth-JPR&SiteId=JSC&TransactionId=noreg
+
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java b/qpid/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java
new file mode 100644
index 0000000000..cf8059a143
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/cluster/Client.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.cluster;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import java.util.Random;
+
+public class Client
+{
+ private final Random random = new Random();
+ private final String name;
+ private final Session session;
+ private final MessageProducer topicProducer;
+ private final MessageProducer queueProducer;
+
+ Client(AMQConnection connection, String name) throws JMSException, InterruptedException
+ {
+ this.name = name;
+ session = connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ AMQTopic topic = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(), new AMQShortString("cluster_test_topic"));
+ AMQQueue queue = new AMQQueue(((AMQSession)session).getDefaultQueueExchangeName(), new AMQShortString("cluster_test_queue"));
+
+ topicProducer = session.createProducer(topic);
+ queueProducer = session.createProducer(queue);
+
+ //subscribe to a known topic
+ session.createConsumer(topic).setMessageListener(new TopicHandler());
+ //subscribe to a known queue
+ session.createConsumer(queue).setMessageListener(new QueueHandler());
+
+ connection.start();
+
+ while(true)
+ {
+ Thread.sleep(random.nextInt(60000));
+ sendToQueue(name + ":" + randomString(5));
+ }
+ }
+
+ private synchronized void sendToTopic(String message) throws JMSException
+ {
+ topicProducer.send(session.createTextMessage(message));
+ }
+
+ private synchronized void sendToQueue(String message) throws JMSException
+ {
+ queueProducer.send(session.createTextMessage(message));
+ }
+
+ private String randomString(int length){
+ char[] c = new char[length];
+ for(int i = 0; i < length; i++)
+ {
+ c[i] = (char) ('A' + random.nextInt(26));
+ }
+ return new String(c);
+ }
+
+ private class QueueHandler implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ sendToTopic(((TextMessage) message).getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class TopicHandler implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ System.out.println(((TextMessage) message).getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws AMQException, JMSException, InterruptedException, URLSyntaxException
+ {
+ //assume args describe the set of brokers to try
+
+ String clientName = argv.length > 1 ? argv[1] : "testClient";
+ new Client(new AMQConnection(argv.length > 0 ? argv[0] : "vm://:1", "guest", "guest", clientName, "/test"), clientName);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
new file mode 100644
index 0000000000..1db7e200bd
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
@@ -0,0 +1,277 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+import java.net.SocketAddress;
+
+/**
+ */
+public class BasicDeliverTest
+{
+ public static void main(String[] argv) throws Exception
+ {
+ BasicDeliverTest test = new BasicDeliverTest();
+
+ //warm up:
+ test.encode(512, 100000);
+
+ //real tests:
+ test.encode(16, 10000, 15);
+ test.encode(32, 10000, 15);
+ test.encode(64, 10000, 15);
+ test.encode(128, 10000, 15);
+ test.encode(256, 10000, 15);
+ test.encode(512, 10000, 15);
+ test.encode(1024, 10000, 15);
+ test.encode(2048, 10000, 15);
+
+ test.decode(16, 10000, 15);
+ test.decode(32, 10000, 15);
+ test.decode(64, 10000, 15);
+ test.decode(128, 10000, 15);
+ test.decode(256, 10000, 15);
+ test.decode(512, 10000, 15);
+ test.decode(1024, 10000, 15);
+ test.decode(2048, 10000, 15);
+ }
+
+ void decode(int size, int count, int iterations) throws Exception
+ {
+ long min = Long.MAX_VALUE;
+ long max = 0;
+ long total = 0;
+ for (int i = 0; i < iterations; i++)
+ {
+ long time = decode(size, count);
+ total += time;
+ if (time < min)
+ {
+ min = time;
+ }
+ if (time > max)
+ {
+ max = time;
+ }
+ }
+ System.out.println("Decoded " + count + " messages of " + size +
+ " bytes: avg=" + (total / iterations) + ", min=" + min + ", max=" + max);
+ }
+
+
+ long decode(int size, int count) throws Exception
+ {
+ AMQDataBlock block = getDataBlock(size);
+ ByteBuffer data = ByteBuffer.allocate((int) block.getSize()); // XXX: Is cast a problem?
+ block.writePayload(data);
+ data.flip();
+ AMQDecoder decoder = new AMQDecoder(false);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < count; i++)
+ {
+ decoder.decode(session, data, decoderOutput);
+ data.rewind();
+ }
+ return System.currentTimeMillis() - start;
+ }
+
+ void encode(int size, int count, int iterations) throws Exception
+ {
+ long min = Long.MAX_VALUE;
+ long max = 0;
+ long total = 0;
+ for (int i = 0; i < iterations; i++)
+ {
+ long time = encode(size, count);
+ total += time;
+ if (time < min)
+ {
+ min = time;
+ }
+ if (time > max)
+ {
+ max = time;
+ }
+ }
+ System.out.println("Encoded " + count + " messages of " + size +
+ " bytes: avg=" + (total / iterations) + ", min=" + min + ", max=" + max);
+ }
+
+ long encode(int size, int count) throws Exception
+ {
+ IoSession session = null;
+ AMQDataBlock block = getDataBlock(size);
+ AMQEncoder encoder = new AMQEncoder();
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < count; i++)
+ {
+ encoder.encode(session, block, encoderOutput);
+ }
+ return System.currentTimeMillis() - start;
+ }
+
+ private final ProtocolEncoderOutput encoderOutput = new ProtocolEncoderOutput()
+ {
+
+ public void write(ByteBuffer byteBuffer)
+ {
+ }
+
+ public void mergeAll()
+ {
+ }
+
+ public WriteFuture flush()
+ {
+ return null;
+ }
+ };
+
+ private final ProtocolDecoderOutput decoderOutput = new ProtocolDecoderOutput()
+ {
+ public void write(Object object)
+ {
+ }
+
+ public void flush()
+ {
+ }
+ };
+
+ private final IoSession session = new BaseIoSession()
+ {
+
+ protected void updateTrafficMask()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoService getService()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return null;
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
+
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
+
+ static CompositeAMQDataBlock getDataBlock(int size)
+ {
+ //create a frame representing message delivery
+ AMQFrame[] frames = new AMQFrame[3];
+ frames[0] = wrapBody(createBasicDeliverBody());
+ frames[1] = wrapBody(createContentHeaderBody());
+ frames[2] = wrapBody(createContentBody(size));
+
+ return new CompositeAMQDataBlock(frames);
+ }
+
+ static AMQFrame wrapBody(AMQBody body)
+ {
+ AMQFrame frame = new AMQFrame(1, body);
+ return frame;
+ }
+
+ static ContentBody createContentBody(int size)
+ {
+ ContentBody body = new ContentBody();
+ body.payload = ByteBuffer.allocate(size);
+ for (int i = 0; i < size; i++)
+ {
+ body.payload.put((byte) DATA[i % DATA.length]);
+ }
+ return body;
+ }
+
+ static ContentHeaderBody createContentHeaderBody()
+ {
+ ContentHeaderBody body = new ContentHeaderBody();
+ body.properties = new BasicContentHeaderProperties();
+ body.weight = 1;
+ body.classId = 6;
+ return body;
+ }
+
+ static BasicDeliverBody createBasicDeliverBody()
+ {
+ BasicDeliverBody body = new BasicDeliverBody((byte) 8, (byte) 0,
+ BasicDeliverBody.getClazz((byte) 8, (byte) 0),
+ BasicDeliverBody.getMethod((byte) 8, (byte) 0),
+ new AMQShortString("myConsumerTag"), 1,
+ new AMQShortString("myExchange"), false,
+ new AMQShortString("myRoutingKey"));
+ return body;
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java
new file mode 100644
index 0000000000..3886021277
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.net.InetSocketAddress;
+
+public class Client extends IoHandlerAdapter
+{
+ //private static final int[] DEFAULT_SIZES = new int[]{1024, 512, 256, 128, 56};
+ //private static final int[] DEFAULT_SIZES = new int[]{256, 256, 256, 256, 256, 512, 512, 512, 512, 512};
+ private static final int[] DEFAULT_SIZES = new int[]{256, 512, 256, 512, 256, 512, 256, 512, 256, 512};
+ //private static final int[] DEFAULT_SIZES = new int[]{1024, 1024, 1024, 1024, 1024};
+
+ private final IoSession _session;
+ private final long _start;
+ private final int _size;
+ private final int _count;
+ private int _received;
+ private boolean _closed;
+
+ Client(String host, int port, int size, int count) throws Exception
+ {
+ _count = count;
+ _size = size;
+ AMQDataBlock block = BasicDeliverTest.getDataBlock(size);
+
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ ConnectFuture future = new SocketConnector().connect(address, this);
+ future.join();
+ _session = future.getSession();
+
+ _start = System.currentTimeMillis();
+ for(int i = 0; i < count; i++)
+ {
+ _session.write(block);
+ }
+ }
+
+ void close()
+ {
+ long time = System.currentTimeMillis() - _start;
+ System.out.println("Received " + _received + " messages of " + _size
+ + " bytes in " + time + "ms.");
+ _session.close();
+ synchronized(this)
+ {
+ _closed = true;
+ notify();
+ }
+ }
+
+ void waitForClose() throws InterruptedException
+ {
+ synchronized(this)
+ {
+ while(!_closed)
+ {
+ wait();
+ }
+ }
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ session.getFilterChain().addLast("protocolFilter", new ProtocolCodecFilter(new AMQCodecFactory(false)));
+ }
+
+ public void messageReceived(IoSession session, Object object) throws Exception
+ {
+ if(isContent(object) && ++_received == _count) close();
+ }
+
+ public void exceptionCaught(IoSession session, Throwable throwable) throws Exception
+ {
+ throwable.printStackTrace();
+ close();
+ }
+
+ private static boolean isDeliver(Object o)
+ {
+ return o instanceof AMQFrame && ((AMQFrame) o).getBodyFrame() instanceof BasicDeliverBody;
+ }
+
+ private static boolean isContent(Object o)
+ {
+ return o instanceof AMQFrame && ((AMQFrame) o).getBodyFrame() instanceof ContentBody;
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String host = argv.length > 0 ? argv[0] : "localhost";
+ int port = argv.length > 1 ? Integer.parseInt(argv[1]) : 8888;
+ int count = argv.length > 2 ? Integer.parseInt(argv[2]) : 10000;
+ int[] sizes = argv.length > 3 ? new int[]{Integer.parseInt(argv[3])} : DEFAULT_SIZES;
+
+ System.out.println("Connecting to " + host + ":" + port);
+
+ for(int i = 0; i < sizes.length; i++)
+ {
+ new Client(host, port, sizes[i], count).waitForClose();
+ Thread.sleep(1000);
+ }
+ }
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Server.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Server.java
new file mode 100644
index 0000000000..fa4295e0b2
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Server.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.util.SessionUtil;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
+
+import java.net.InetSocketAddress;
+
+public class Server extends IoHandlerAdapter
+{
+ Server(int port) throws Exception
+ {
+ new SocketAcceptor().bind(new InetSocketAddress(port), this);
+ System.out.println("Listening on " + port);
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ SessionUtil.initialize(session);
+ session.getFilterChain().addLast("protocolFilter", new ProtocolCodecFilter(new AMQCodecFactory(false)));
+ }
+
+ public void messageReceived(IoSession session, Object object) throws Exception
+ {
+ getAccumulator(session).received(session, (AMQFrame) object);
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+ System.out.println("sessionOpened()");
+ }
+
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ System.out.println("sessionClosed()");
+ }
+
+ public void exceptionCaught(IoSession session, Throwable t) throws Exception
+ {
+ System.out.println("exceptionCaught()");
+ t.printStackTrace();
+ session.close();
+ }
+
+ private Accumulator getAccumulator(IoSession session)
+ {
+ Accumulator a = (Accumulator) session.getAttribute(ACCUMULATOR);
+ if(a == null)
+ {
+ a = new Accumulator();
+ session.setAttribute(ACCUMULATOR, a);
+ }
+ return a;
+ }
+
+ private static final String ACCUMULATOR = Accumulator.class.getName();
+
+ private static class Accumulator
+ {
+ private final AMQFrame[] frames = new AMQFrame[3];
+ private int i;
+
+ void received(IoSession session, AMQFrame frame)
+ {
+ frames[i++] = frame;
+ if(i >= frames.length)
+ {
+ i = 0;
+ session.write(new CompositeAMQDataBlock(frames));
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ int port = argv.length > 0 ? Integer.parseInt(argv[0]) : 8888;
+ new Server(port);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/qpid/java/client/src/old_test/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..cac0064785
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.ConnectorConfig;
+
+import javax.jms.ConnectionFactory;
+
+class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config)
+ {
+ return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path");
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/config/AbstractConfig.java b/qpid/java/client/src/old_test/java/org/apache/qpid/config/AbstractConfig.java
new file mode 100644
index 0000000000..04381d66a0
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/config/AbstractConfig.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+public abstract class AbstractConfig
+{
+ public boolean setOptions(String[] argv)
+ {
+ try
+ {
+ for(int i = 0; i < argv.length - 1; i += 2)
+ {
+ String key = argv[i];
+ String value = argv[i+1];
+ setOption(key, value);
+ }
+ return true;
+ }
+ catch(Exception e)
+ {
+ System.out.println(e.getMessage());
+ }
+ return false;
+ }
+
+ protected int parseInt(String msg, String i)
+ {
+ try
+ {
+ return Integer.parseInt(i);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException(msg + ": " + i);
+ }
+ }
+
+ protected long parseLong(String msg, String i)
+ {
+ try
+ {
+ return Long.parseLong(i);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException(msg + ": " + i);
+ }
+ }
+
+ public abstract void setOption(String key, String value);
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..a9984eb09a
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+public interface ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException;
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/config/Connector.java b/qpid/java/client/src/old_test/java/org/apache/qpid/config/Connector.java
new file mode 100644
index 0000000000..ff2377f087
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/config/Connector.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+public class Connector
+{
+ public Connection createConnection(ConnectorConfig config) throws Exception
+ {
+ return getConnectionFactory(config).createConnection();
+ }
+
+ ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception
+ {
+ String factory = config.getFactory();
+ if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName();
+ System.out.println("Using " + factory);
+ return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectorConfig.java b/qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectorConfig.java
new file mode 100644
index 0000000000..b120ed3f12
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/config/ConnectorConfig.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+public interface ConnectorConfig
+{
+ public String getHost();
+ public int getPort();
+ public String getFactory();
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/qpid/java/client/src/old_test/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..1c86aea56c
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.ConnectorConfig;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.MBeanException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.NameNotFoundException;
+import java.util.Hashtable;
+
+public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException
+ {
+ ConnectionFactory cf = null;
+ InitialContext ic = null;
+ Hashtable ht = new Hashtable();
+ ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+ String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01");
+ String jbossPort = System.getProperty("jboss.port", "1099");
+ ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort);
+ ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
+
+ try
+ {
+ ic = new InitialContext(ht);
+ if (!doesDestinationExist("topictest.messages", ic))
+ {
+ deployTopic("topictest.messages", ic);
+ }
+ if (!doesDestinationExist("topictest.control", ic))
+ {
+ deployTopic("topictest.control", ic);
+ }
+
+ cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
+ return cf;
+ }
+ catch (NamingException e)
+ {
+ JMSException jmse = new JMSException("Unable to lookup object: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ catch (Exception e)
+ {
+ JMSException jmse = new JMSException("Error creating topic: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+
+ private boolean doesDestinationExist(String name, InitialContext ic) throws Exception
+ {
+ try
+ {
+ ic.lookup("/" + name);
+ }
+ catch (NameNotFoundException e)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private void deployTopic(String name, InitialContext ic) throws Exception
+ {
+ MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);
+
+ ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer");
+
+ String jndiName = "/" + name;
+ try
+ {
+ mBeanServer.invoke(serverObjectName, "createTopic",
+ new Object[]{name, jndiName},
+ new String[]{"java.lang.String", "java.lang.String"});
+ }
+ catch (MBeanException e)
+ {
+ System.err.println("Error: " + e);
+ System.err.println("Cause: " + e.getCause());
+ }
+ }
+
+ private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException
+ {
+ return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor");
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
new file mode 100644
index 0000000000..cb8adae18c
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.flow;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+public class ChannelFlowTest implements MessageListener
+{
+ private int sent;
+ private int received;
+
+ ChannelFlowTest(String broker) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test"));
+ }
+
+ ChannelFlowTest(AMQConnection connection) throws Exception
+ {
+ this(connection, new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("ChannelFlowTest")), true));
+ }
+
+ ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50,25);
+
+ //set up a slow consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ //create a publisher
+ MessageProducer producer = session.createProducer(destination);
+ Message msg = session.createTextMessage("Message");
+
+ //publish in bursts that are fast enough to cause channel flow control
+ for(int i = 0; i < 10; i++)
+ {
+ for(int j = 0; j < 100; j++)
+ {
+ producer.send(msg);
+ sent++;
+ }
+ waitUntilReceived(sent - 40);
+ }
+
+ waitUntilReceived(sent);
+
+ session.close();
+ connection.close();
+ }
+
+
+ private synchronized void waitUntilReceived(int count) throws InterruptedException
+ {
+ while(received <count)
+ {
+ wait();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ try
+ {
+ Thread.sleep(50);
+
+ received++;
+ notify();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ new ChannelFlowTest(argv.length == 0 ? "localhost:5672" : argv[0]);
+ }
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java b/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
new file mode 100644
index 0000000000..2fe01fc126
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.fragmentation;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.log4j.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ */
+public class TestLargePublisher
+{
+ private static final Logger _log = Logger.getLogger(TestLargePublisher.class);
+
+ private AMQConnection _connection;
+
+ private AMQSession _session;
+
+ private class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ _actualMessageCount++;
+ if (_actualMessageCount%1000 == 0)
+ {
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+ /*if (!"henson".equals(m.toString()))
+ {
+ _log.error("AbstractJMSMessage response not correct: expected 'henson' but got " + m.toString());
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("AbstractJMSMessage " + m + " received");
+ }
+ else
+ {
+ _log.info("AbstractJMSMessage received");
+ }
+ } */
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
+ }
+ }
+ }
+
+ public TestLargePublisher(String host, int port, String clientID,
+ final int messageCount) throws AMQException,URLSyntaxException
+ {
+ try
+ {
+ createConnection(host, port, clientID);
+
+ _session = (AMQSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic destination = new AMQTopic(_session.getDefaultTopicExchangeName(), new AMQShortString("large"));
+ MessageProducer producer = (MessageProducer) _session.createProducer(destination);
+
+ _connection.start();
+ //TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
+ final long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ populateMessage(msg);
+ producer.send(msg);
+ }
+ _log.info("Finished sending " + messageCount + " messages");
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void createConnection(String host, int port, String clientID) throws AMQException , URLSyntaxException
+ {
+ _connection = new AMQConnection(host, port, "guest", "guest",
+ clientID, "/test");
+ }
+
+ private void populateMessage(BytesMessage msg) throws JMSException
+ {
+ int size = 1024 * 187; // 187k
+ byte[] data = new byte[size];
+ for (int i = 0; i < data.length; i++)
+ {
+ data[i] = (byte)(i%25);
+ }
+ msg.writeBytes(data);
+ }
+
+ /**
+ *
+ * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+ * means the server will allocate a name.
+ */
+ public static void main(String[] args) throws URLSyntaxException
+ {
+ final String host;
+ final int port;
+ final int numMessages;
+ if (args.length == 0)
+ {
+ host = "localhost";
+ port = 5672;
+ numMessages = 100;
+// System.err.println("Usage: TestLargePublisher <host> <port> <number of messages>");
+ }
+ else
+ {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ numMessages = Integer.parseInt(args[2]);
+ }
+
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+ TestLargePublisher client = new TestLargePublisher(host, port, clientID, numMessages);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ catch (AMQException e)
+ {
+ System.err.println("Error in client: " + e);
+ e.printStackTrace();
+ }
+
+ //System.exit(0);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java b/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
new file mode 100644
index 0000000000..b0cde22349
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.fragmentation;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import java.net.InetAddress;
+
+public class TestLargeSubscriber
+{
+ private static final Logger _logger = Logger.getLogger(TestLargeSubscriber.class);
+
+ private static MessageProducer _destinationProducer;
+
+ private static String _destinationName;
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ final String host;
+ final int port;
+ final String username;
+ final String password;
+ final String virtualPath;
+ final int numExpectedMessages;
+ if (args.length == 0)
+ {
+ host = "localhost";
+ port = 5672;
+ username = "guest";
+ password = "guest";
+ virtualPath = "/test";
+ numExpectedMessages = 100;
+ }
+ else if (args.length == 6)
+ {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ username = args[2];
+ password = args[3];
+ virtualPath = args[4];
+ numExpectedMessages = Integer.parseInt(args[5]);
+ }
+ else
+ {
+ System.out.println("Usage: host port username password virtual-path expectedMessageCount");
+ System.exit(1);
+ throw new RuntimeException("cannot be reached");
+ }
+
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ AMQConnection con = new AMQConnection(host, port, username, password,
+ address.getHostName(), virtualPath);
+ final AMQSession session = (AMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final int expectedMessageCount = numExpectedMessages;
+
+ MessageConsumer consumer = session.createConsumer(new AMQTopic(session.getDefaultTopicExchangeName(),
+ new AMQShortString("large")),
+ 100, true, false, null);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ private long _startTime = 0;
+
+ public void onMessage(Message message)
+ {
+ validateMessage(message);
+ if (_messageCount++ == 0)
+ {
+ _startTime = System.currentTimeMillis();
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Got message '" + message + "'");
+ }
+ if (_messageCount == expectedMessageCount)
+ {
+ long totalTime = System.currentTimeMillis() - _startTime;
+ _logger.error("Total time to receive " + _messageCount + " messages was " +
+ totalTime + "ms. Rate is " + (_messageCount/(totalTime/1000.0)));
+ }
+ }
+
+ private void validateMessage(Message message)
+ {
+ if (!(message instanceof BytesMessage))
+ {
+ _logger.error("Message is not of correct type - should be BytesMessage and is " +
+ message.getClass());
+ }
+ BytesMessage bm = (BytesMessage) message;
+ final int expectedSize = 1024 * 187; // 187k
+ try
+ {
+ if (bm.getBodyLength() != expectedSize)
+ {
+ _logger.error("Message is not correct length - should be " + expectedSize + " and is " +
+ bm.getBodyLength());
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Failed to validate message: " + e, e);
+ }
+ try
+ {
+ byte[] data = new byte[(int)bm.getBodyLength()];
+ bm.readBytes(data);
+ for (int i = 0; i < data.length; i++)
+ {
+ if (data[i] != (byte)(i%25))
+ {
+ _logger.error("byte " + i + " of message is wrong - should be " + i%25 + " but is " +
+ data[i]);
+ }
+ }
+ _logger.info("***** Validated message successfully");
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Failed to validate message: " + e, e);
+ }
+ }
+ });
+ con.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ System.out.println("Waiting...");
+ }
+}
+
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java
new file mode 100644
index 0000000000..cb5caefc1e
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.headers;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.Session;
+//import org.apache.qpid.testutil.Config;
+
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.JMSException;
+
+public class Listener //implements MessageListener
+{
+/* private final AMQConnection _connection;
+ private final MessageProducer _controller;
+ private final AMQSession _session;
+ private final MessageFactory _factory;
+ private int count;
+ private long start;
+
+ Listener(AMQConnection connection, Destination exchange) throws Exception
+ {
+ _connection = connection;
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _factory = new MessageFactory(_session, 0, 19);
+
+ //register for events
+ _factory.createConsumer(exchange).setMessageListener(this);
+ _connection.start();
+
+ _controller = _session.createProducer(exchange);
+ }
+
+ private void shutdown()
+ {
+ try
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ try
+ {
+ String msg = getReport();
+ _controller.send(_factory.createReportResponseMessage(msg));
+ System.out.println("Sent report: " + msg);
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport() throws JMSException
+ {
+ long time = (System.currentTimeMillis() - start);
+ return "Received " + count + " in " + time + "ms";
+ }
+
+ public void onMessage(Message message)
+ {
+ if(count == 0) start = System.currentTimeMillis();
+
+ if(_factory.isShutdown(message))
+ {
+ shutdown();
+ }
+ else if(_factory.isReport(message))
+ {
+ //send a report:
+ report();
+ }
+ else if (++count % 100 == 0)
+ {
+ System.out.println("Received " + count + " messages.");
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setType(Config.HEADERS);
+ config.setName("test_headers_exchange");
+ config.setOptions(argv);
+ new Listener((AMQConnection) config.getConnection(), config.getDestination());
+ }*/
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/MessageFactory.java b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/MessageFactory.java
new file mode 100644
index 0000000000..a2d575fdd4
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/MessageFactory.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.headers;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+/**
+ */
+class MessageFactory
+{
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+ private final AMQSession _session;
+ private final byte[] _payload;
+
+ private String[] _headerNames;
+
+ MessageFactory(AMQSession session)
+ {
+ this(session, Integer.getInteger("amqj.test.message_size", 256).intValue(), 5);
+ }
+
+ MessageFactory(AMQSession session, int payloadSize, int headerCount)
+ {
+ if (headerCount < 1)
+ {
+ throw new IllegalArgumentException("Header count must be positive");
+ }
+ _session = session;
+ _payload = new byte[payloadSize];
+ for (int i = 0; i < _payload.length; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
+ _headerNames = new String[headerCount];
+ // note that with the standard encoding the headers get prefixed with an S to indicate their type
+ for (int i = 0; i < _headerNames.length; i++)
+ {
+ if (i < 10)
+ {
+ _headerNames[i] = "F000" + i;
+ }
+ else if (i >= 10 && i < 100)
+ {
+ _headerNames[i] = "F00" + i;
+ }
+ else
+ {
+ _headerNames[i] = "F0" + i;
+ }
+ }
+ }
+
+ Message createEventMessage() throws JMSException
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ if (_payload.length != 0)
+ {
+ msg.writeBytes(_payload);
+ }
+ return setHeaders(msg, _headerNames);
+ }
+
+ Message createShutdownMessage() throws JMSException
+ {
+ return setHeaders(_session.createMessage(), new String[]{"F0000", "SHUTDOWN"});
+ }
+
+ Message createReportRequestMessage() throws JMSException
+ {
+ return setHeaders(_session.createMessage(), new String[]{"F0000", "REPORT"});
+ }
+
+ Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return setHeaders(_session.createTextMessage(msg), new String[]{"CONTROL", "REPORT"});
+ }
+
+ boolean isShutdown(Message m)
+ {
+ return checkPresent(m, "SHUTDOWN");
+ }
+
+ boolean isReport(Message m)
+ {
+ return checkPresent(m, "REPORT");
+ }
+
+ Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return e.toString();
+ }
+ }
+
+ FieldTable getConsumerBinding()
+ {
+ FieldTable binding = FieldTableFactory.newFieldTable();
+ binding.setString("SF0000", "value");
+ return binding;
+ }
+
+ FieldTable getControllerBinding()
+ {
+ FieldTable binding = FieldTableFactory.newFieldTable();
+ binding.setString("SCONTROL", "value");
+ return binding;
+ }
+
+ MessageConsumer createConsumer(Destination source) throws Exception
+ {
+ return _session.createConsumer(source, 0, false, true, null, getConsumerBinding());
+ }
+
+ MessageConsumer createController(Destination source) throws Exception
+ {
+ return _session.createConsumer(source, 0, false, true, null, getControllerBinding());
+ }
+
+ private static boolean checkPresent(Message m, String s)
+ {
+ try
+ {
+ return m.getStringProperty(s) != null;
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return false;
+ }
+ }
+
+ private static Message setHeaders(Message m, String[] headers) throws JMSException
+ {
+ for (int i = 0; i < headers.length; i++)
+ {
+ // the value in GRM is 5 bytes
+ m.setStringProperty(headers[i], "value");
+ }
+ return m;
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java
new file mode 100644
index 0000000000..d9ef702c48
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.headers;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+//import org.apache.qpid.testutil.Config;
+
+import javax.jms.*;
+
+public class Publisher // implements MessageListener
+{
+/* private final Object _lock = new Object();
+ private final AMQConnection _connection;
+ private final AMQSession _session;
+ private final Destination _exchange;
+ private final MessageFactory _factory;
+ private final MessageProducer _publisher;
+ private int _count;
+
+ Publisher(AMQConnection connection, Destination exchange) throws Exception
+ {
+ _connection = connection;
+ _exchange = exchange;
+ _session = (AMQSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _factory = new MessageFactory(_session, 0, 19);
+ _publisher = _session.createProducer(_exchange);
+ }
+
+ Publisher(Config config) throws Exception
+ {
+ this((AMQConnection) config.getConnection(), config.getDestination());
+ }
+
+ private void test(int msgCount, int consumerCount) throws Exception
+ {
+ _count = consumerCount;
+ _factory.createController(_exchange).setMessageListener(this);
+ _connection.start();
+ long start = System.currentTimeMillis();
+ publish(msgCount);
+ waitForCompletion(consumerCount);
+ long end = System.currentTimeMillis();
+
+ System.out.println("Completed in " + (end - start) + " ms.");
+
+ //request shutdown
+ _publisher.send(_factory.createShutdownMessage());
+
+ _connection.stop();
+ _connection.close();
+ }
+
+ private void publish(int count) throws Exception
+ {
+
+ //send events
+ for (int i = 0; i < count; i++)
+ {
+ _publisher.send(_factory.createEventMessage());
+ if ((i + 1) % 100 == 0)
+ {
+ System.out.println("Sent " + (i + 1) + " messages");
+ }
+ }
+
+ //request report
+ _publisher.send(_factory.createReportRequestMessage());
+ }
+
+ private void waitForCompletion(int consumers) throws Exception
+ {
+ System.out.println("Waiting for completion...");
+ synchronized (_lock)
+ {
+ while (_count > 0)
+ {
+ _lock.wait();
+ }
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
+ if (_count == 0)
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+ }
+
+
+ public static void main(String[] argv) throws Exception
+ {
+ if (argv.length >= 2)
+ {
+ int msgCount = Integer.parseInt(argv[argv.length - 2]);
+ int consumerCount = Integer.parseInt(argv[argv.length - 1]);
+
+ Config config = new Config();
+ config.setType(Config.HEADERS);
+ config.setName("test_headers_exchange");
+ String[] options = new String[argv.length - 2];
+ System.arraycopy(argv, 0, options, 0, options.length);
+ config.setOptions(options);
+
+ new Publisher(config).test(msgCount, consumerCount);
+ }
+
+ }*/
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Bind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Bind.java
new file mode 100644
index 0000000000..ee6a12c233
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Bind.java
@@ -0,0 +1,273 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceable;
+
+import org.apache.qpid.client.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import javax.naming.*;
+
+import java.util.Properties;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Binds a reference from a JNDI source.
+ * Given a properties file with the JNDI information and a binding string.
+ */
+public class Bind
+{
+ private static final String USAGE="USAGE: java bind <JNDI Properties file> -cf <url> <binding> | -c <url> <binding> [-t <topic Name> <binding>] [-q <queue Name> <binding>]";
+ public Bind(String propertiesFile, String bindingURL, Referenceable reference) throws NameAlreadyBoundException, NoInitialContextException
+ {
+ // Set up the environment for creating the initial context
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+ {
+ qpid_home += "/";
+ }
+
+ try
+ {
+ InputStream inputStream = new FileInputStream(qpid_home + propertiesFile);
+ Properties properties = new Properties();
+ properties.load(inputStream);
+
+ // Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ // Perform the binds
+ ctx.bind(bindingURL, reference);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Unable to access properties file:" + propertiesFile + " Due to:" + ioe);
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ if (e instanceof NameAlreadyBoundException)
+ {
+ throw (NameAlreadyBoundException) e;
+ }
+
+ if (e instanceof NoInitialContextException)
+ {
+ throw (NoInitialContextException) e;
+ }
+ }
+
+ }
+
+ private static String parse(String[] args, int index, String what, String type)
+ {
+ try
+ {
+ return args[index];
+ }
+ catch (IndexOutOfBoundsException ioobe)
+ {
+ System.out.println("ERROR: No " + what + " specified for " + type + ".");
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // The path is either return normally or exception.. which calls system exit so keep the compiler happy
+ return "Never going to happen";
+ }
+
+
+ public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException, URLSyntaxException, AMQException, JMSException
+ {
+
+
+ org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.OFF);
+
+// org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(AMQConnection.class);
+// _logger.setLevel(org.apache.log4j.Level.OFF);
+
+ boolean exit = false;
+
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ exit = true;
+ }
+
+ if (args.length <= 2)
+ {
+ System.out.println("At least a connection or connection factory must be requested to be bound.");
+ exit = true;
+ }
+ else
+ {
+ if ((args.length - 1) % 3 != 0)
+ {
+ System.out.println("Not all values have full details");
+ exit = true;
+ }
+ }
+ if (exit)
+ {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+
+ {
+ qpid_home += "/";
+ }
+
+ AMQConnectionFactory cf = null;
+ AMQConnection c = null;
+ AMQSession session = null;
+ Referenceable reference = null;
+
+ for (int index = 1; index < args.length; index ++)
+ {
+ String obj = args[index];
+
+ String what = "Invalid";
+ String binding;
+
+ if (obj.startsWith("-c"))
+ {
+ boolean isFactory = obj.contains("f");
+
+
+ if (isFactory)
+ {
+ what = "ConnectionFactory";
+ }
+ else
+ {
+ what = "Factory";
+ }
+
+ String url = parse(args, ++index, "url", what);
+
+ if (isFactory)
+ {
+
+ cf = new AMQConnectionFactory(url);
+ reference = cf;
+ }
+ else
+ {
+ c = new AMQConnection(url);
+ reference = c;
+ }
+
+ }
+
+ if (obj.equals("-t") || obj.equals("-q"))
+ {
+ if (c == null)
+ {
+ c = (AMQConnection) cf.createConnection();
+ }
+
+ if (session == null)
+ {
+ session = (AMQSession) c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ }
+
+ if (obj.equals("-t"))
+ {
+
+ String topicName = parse(args, ++index, "Topic Name", "Topic");
+ reference = (AMQTopic) session.createTopic(topicName);
+ what = "Topic";
+ }
+ else
+ {
+ if (obj.equals("-q"))
+ {
+ String topicName = parse(args, ++index, "Queue Name", "Queue");
+ reference = (AMQQueue) session.createQueue(topicName);
+ what = "Queue";
+ }
+ }
+
+ binding = parse(args, ++index, "binding", what);
+ if (binding == null)
+ {
+ System.out.println(obj + " is not a known Object to bind.");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.print("Binding:" + reference + " to " + binding);
+ try
+ {
+ new Bind(args[0], binding, reference);
+ System.out.println(" ..Successful");
+
+ }
+ catch (NameAlreadyBoundException nabe)
+ {
+ System.out.println("");
+ if (!obj.startsWith("-c") || index == args.length - 1)
+ {
+ throw nabe;
+ }
+ else
+ {
+ System.out.println("Continuing with other bindings using the same connection details");
+ }
+ }
+ finally
+ {
+ if (!obj.startsWith("-c") || index == args.length - 1)
+ {
+ if (c != null)
+ {
+ c.close();
+ }
+ }
+ }
+ }
+ }
+
+ if (c != null)
+ {
+ c.close();
+ }
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Lookup.java b/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Lookup.java
new file mode 100644
index 0000000000..1c9d8b0fd5
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Lookup.java
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceable;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Looksup a reference from a JNDI source.
+ * Given a properties file with the JNDI information and a binding string.
+ */
+public class Lookup
+{
+ private static final String USAGE = "USAGE: java lookup <JNDI Properties file> -b <binding>";
+ private Properties _properties;
+ private Object _object;
+
+ public Lookup(String propertiesFile, String bindingValue) throws NamingException
+ {
+ // Set up the environment for creating the initial context
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+ {
+ qpid_home += "/";
+ }
+
+ try
+ {
+ InputStream inputStream = new FileInputStream(qpid_home + propertiesFile);
+ Properties properties = new Properties();
+ properties.load(inputStream);
+
+ _properties = properties;
+ lookup(bindingValue);
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Unable to access properties file:" + propertiesFile + " Due to:" + ioe);
+ }
+ }
+
+ public Object lookup(String bindingValue) throws NamingException
+ {
+
+ // Create the initial context
+ Context ctx = new InitialContext(_properties);
+
+ // Perform the binds
+ _object = ctx.lookup(bindingValue);
+
+ // Close the context when we're done
+ ctx.close();
+
+ return getObject();
+ }
+
+ public Object getObject()
+ {
+ return _object;
+ }
+
+ private static String parse(String[] args, int index, String what)
+ {
+ try
+ {
+ return args[index];
+ }
+ catch (IndexOutOfBoundsException ioobe)
+ {
+ System.out.println("ERROR: No " + what + " specified.");
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // The path is either return normally or exception.. which calls system exit so keep the compiler happy
+ return "Never going to happen";
+ }
+
+
+ public static void main(String[] args) throws NamingException
+ {
+ boolean exit = false;
+
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ exit = true;
+ }
+
+ if (args.length <= 2)
+ {
+ System.out.println("At least a connection or connection factory must be requested to be bound.");
+ exit = true;
+ }
+ else
+ {
+ if ((args.length - 1) % 2 != 0)
+ {
+ System.out.println("Not all values have full details");
+ exit = true;
+ }
+ }
+ if (exit)
+ {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+
+ {
+ qpid_home += "/";
+ }
+
+ for (int index = 1; index < args.length; index ++)
+ {
+ String obj = args[index];
+
+
+ if (obj.equals("-b"))
+ {
+ String binding = parse(args, ++index, "binding");
+
+ if (binding == null)
+ {
+ System.out.println("Binding not specified.");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.print("Looking up:" + binding);
+ try
+ {
+ Lookup l = new Lookup(args[0], binding);
+
+ Object object = l.getObject();
+
+ if (object instanceof Connection)
+ {
+ try
+ {
+ ((Connection) object).close();
+ }
+ catch (JMSException jmse)
+ {
+ ;
+ }
+ }
+ }
+ catch (NamingException nabe)
+ {
+ System.out.println("Problem unbinding " + binding + " continuing with other values.");
+ }
+ }
+ }// if -b
+ else
+ {
+ System.out.println("Continuing with other bindings option not known:" + obj);
+ }
+ }//for
+ }//main
+}//class
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Unbind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Unbind.java
new file mode 100644
index 0000000000..1acead674c
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/jndi/referenceable/Unbind.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceable;
+
+import javax.naming.*;
+
+import java.util.Properties;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Unbinds a reference from a JNDI source.
+ * Given a properties file with the JNDI information and a binding string.
+ */
+public class Unbind
+{
+ private static final String USAGE = "USAGE: java unbind <JNDI Properties file> -b <binding>";
+
+ public Unbind(String propertiesFile, String bindingValue) throws NamingException
+ {
+ // Set up the environment for creating the initial context
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+ {
+ qpid_home += "/";
+ }
+
+ try
+ {
+ InputStream inputStream = new FileInputStream(qpid_home + propertiesFile);
+ Properties properties = new Properties();
+ properties.load(inputStream);
+
+ // Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ // Perform the binds
+ ctx.unbind(bindingValue);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Unable to access properties file:" + propertiesFile + " Due to:" + ioe);
+ }
+ }
+
+ private static String parse(String[] args, int index, String what)
+ {
+ try
+ {
+ return args[index];
+ }
+ catch (IndexOutOfBoundsException ioobe)
+ {
+ System.out.println("ERROR: No " + what + " specified.");
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // The path is either return normally or exception.. which calls system exit so keep the compiler happy
+ return "Never going to happen";
+ }
+
+
+ public static void main(String[] args) throws NamingException
+ {
+ boolean exit = false;
+
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ exit = true;
+ }
+
+ if (args.length <= 2)
+ {
+ System.out.println("At least a connection or connection factory must be requested to be bound.");
+ exit = true;
+ }
+ else
+ {
+ if ((args.length - 1) % 2 != 0)
+ {
+ System.out.println("Not all values have full details");
+ exit = true;
+ }
+ }
+ if (exit)
+ {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+
+ {
+ qpid_home += "/";
+ }
+
+ for (int index = 1; index < args.length; index ++)
+ {
+ String obj = args[index];
+
+
+ if (obj.equals("-b"))
+ {
+ String binding = parse(args, ++index, "binding");
+
+ if (binding == null)
+ {
+ System.out.println("Binding not specified.");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.print("UnBinding:" + binding);
+ try
+ {
+ new Unbind(args[0], binding);
+ System.out.println(" ..Successful");
+ }
+ catch (NamingException nabe)
+ {
+ System.out.println("");
+
+ System.out.println("Problem unbinding " + binding + " continuing with other values.");
+ }
+ }
+ }// if -b
+ else
+ {
+ System.out.println("Continuing with other bindings option not known:" + obj);
+ }
+ }//for
+ }//main
+}//class
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java
new file mode 100644
index 0000000000..4865a68dc4
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/latency/LatencyTest.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.latency;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.jms.BytesMessage;
+
+public class LatencyTest implements MessageListener
+{
+ private volatile boolean waiting;
+ private int sent;
+ private int received;
+
+ private final byte[] data;
+
+ private long min = Long.MAX_VALUE;
+ private long max = 0;
+ private long total = 0;
+
+ LatencyTest(String broker, int count, int delay, int length) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test"), count, delay, length);
+ }
+
+ LatencyTest(AMQConnection connection, int count, int delay, int length) throws Exception
+ {
+ this(connection, new AMQQueue(connection.getDefaultQueueExchangeName(), new AMQShortString(randomize("LatencyTest")), true), count, delay, length);
+ }
+
+ LatencyTest(AMQConnection connection, AMQDestination destination, int count, int delay, int length) throws Exception
+ {
+ AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ data = new byte[length];
+ for(int i = 0; i < data.length; i++)
+ {
+ data[i] = (byte) (i % 100);
+ }
+
+ //set up a consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ //create a publisher
+ MessageProducer producer = session.createProducer(destination, false, false, true);
+
+ //publish at a low volume
+ for(int i = 0; i < count; i++)
+ {
+ BytesMessage msg = session.createBytesMessage();
+ msg.writeBytes(data);
+ msg.setStringProperty("sent-at", Long.toString(System.nanoTime()));
+ producer.send(msg);
+ Thread.sleep(delay);
+ if(++sent % 100 == 0)
+ {
+ System.out.println("Sent " + sent + " of " + count);
+ }
+ }
+
+ waitUntilReceived(sent);
+
+ session.close();
+ connection.close();
+
+ System.out.println("Latency (in nanoseconds): avg=" + (total/sent) + ", min=" + min + ", max=" + max
+ + ", avg(discarding min and max)=" + ((total - min - max) / (sent - 2)));
+ }
+
+
+ private synchronized void waitUntilReceived(int count) throws InterruptedException
+ {
+ waiting = true;
+ while(received < count)
+ {
+ wait();
+ }
+ waiting = false;
+ }
+
+ public void onMessage(Message message)
+ {
+ received++;
+ try
+ {
+ long sent = Long.parseLong(message.getStringProperty("sent-at"));
+ long time = System.nanoTime() - sent;
+ total += time;
+ min = Math.min(min, time);
+ max = Math.max(max, time);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+
+ if(waiting){
+ synchronized(this)
+ {
+ notify();
+ }
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String host = argv.length > 0 ? argv[0] : "localhost:5672";
+ if("-help".equals(host))
+ {
+ System.out.println("Usage: <broker> <message count> <delay between messages> <message size>");
+ }
+ int count = argv.length > 1 ? Integer.parseInt(argv[1]) : 1000;
+ int delay = argv.length > 2 ? Integer.parseInt(argv[2]) : 1000;
+ int size = argv.length > 3 ? Integer.parseInt(argv[3]) : 512;
+ new LatencyTest(host, count, delay, size);
+ }
+
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
new file mode 100644
index 0000000000..f0ac0e6902
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.mina;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again.
+ *
+ */
+public class AcceptorTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(AcceptorTest.class);
+
+ public static int PORT = 9999;
+
+ private static class TestHandler extends IoHandlerAdapter
+ {
+ private int _sentCount;
+
+ private int _bytesSent;
+
+ public void messageReceived(IoSession session, Object message) throws Exception
+ {
+ ((ByteBuffer) message).acquire();
+ session.write(message);
+ _logger.debug("Sent response " + ++_sentCount);
+ _bytesSent += ((ByteBuffer)message).remaining();
+ _logger.debug("Bytes sent: " + _bytesSent);
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ //((ByteBuffer) message).release();
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ _logger.error("Error: " + cause, cause);
+ }
+ }
+
+ public void testStartAcceptor() throws IOException
+ {
+ IoAcceptor acceptor = null;
+ acceptor = new SocketAcceptor();
+
+ SocketAcceptorConfig config = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) config.getSessionConfig();
+ sc.setTcpNoDelay(true);
+ sc.setSendBufferSize(32768);
+ sc.setReceiveBufferSize(32768);
+
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
+
+ acceptor.bind(new InetSocketAddress(PORT),
+ new TestHandler());
+ _logger.info("Bound on port " + PORT);
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ AcceptorTest a = new AcceptorTest();
+ a.testStartAcceptor();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(AcceptorTest.class);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java
new file mode 100644
index 0000000000..bfe29c47e6
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.mina;
+
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import junit.framework.TestCase;
+
+public class BlockingAcceptorTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(BlockingAcceptorTest.class);
+
+ public static int PORT = 9999;
+
+ public void testStartAcceptor() throws IOException
+ {
+
+ ServerSocket sock = new ServerSocket(PORT);
+
+ sock.setReuseAddress(true);
+ sock.setReceiveBufferSize(32768);
+ _logger.info("Bound on port " + PORT);
+
+ while (true)
+ {
+ final Socket s = sock.accept();
+ _logger.info("Received connection from " + s.getRemoteSocketAddress());
+ s.setReceiveBufferSize(32768);
+ s.setSendBufferSize(32768);
+ s.setTcpNoDelay(true);
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ byte[] chunk = new byte[32768];
+ try
+ {
+ InputStream is = s.getInputStream();
+ OutputStream os = s.getOutputStream();
+
+ while (true)
+ {
+ int count = is.read(chunk, 0, chunk.length);
+ if (count > 0)
+ {
+ os.write(chunk, 0, count);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.error("Error - closing connection: " + e, e);
+ }
+ }
+ }, "SocketReaderWriter").start();
+ }
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ BlockingAcceptorTest a = new BlockingAcceptorTest();
+ a.testStartAcceptor();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(AcceptorTest.class);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java
new file mode 100644
index 0000000000..910345624f
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.mina;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.*;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+public class WriterTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(WriterTest.class);
+
+ private static class RunnableWriterTest implements Runnable
+ {
+ private Logger _logger;
+
+ private IoSession _session;
+
+ private long _startTime;
+
+ private long[] _chunkTimes;
+
+ private int _chunkCount = 500000;
+
+ private int _chunkSize = 1024;
+
+ private CountDownLatch _notifier;
+
+ public RunnableWriterTest(Logger logger)
+ {
+ _logger = logger;
+ }
+
+ public void run()
+ {
+ _startTime = System.currentTimeMillis();
+ _notifier = new CountDownLatch(1);
+ for (int i = 0; i < _chunkCount; i++)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false);
+ byte check = (byte) (i % 128);
+ buf.put(check);
+ buf.fill((byte)88, buf.remaining());
+ buf.flip();
+ _session.write(buf);
+ }
+
+ try
+ {
+ _logger.info("All buffers sent; waiting for receipt from server");
+ _notifier.await();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ _logger.info("Completed");
+ long totalTime = System.currentTimeMillis() - _startTime;
+ _logger.info("Total time: " + totalTime);
+ _logger.info("MB per second: " + (_chunkSize * _chunkCount)/totalTime);
+ long lastChunkTime = _startTime;
+ double average = 0;
+ for (int i = 0; i < _chunkTimes.length; i++)
+ {
+ if (i == 0)
+ {
+ average = _chunkTimes[i] - _startTime;
+ }
+ else
+ {
+ long delta = _chunkTimes[i] - lastChunkTime;
+ if (delta != 0)
+ {
+ average = (average + delta)/2;
+ }
+ }
+ lastChunkTime = _chunkTimes[i];
+ }
+ _logger.info("Average chunk time: " + average + "ms");
+ CloseFuture cf = _session.close();
+ cf.join();
+ }
+
+ private class WriterHandler extends IoHandlerAdapter
+ {
+ private int _chunksReceived = 0;
+
+ private int _partialBytesRead = 0;
+
+ private byte _partialCheckNumber;
+
+ private int _totalBytesReceived = 0;
+
+ public void messageReceived(IoSession session, Object message) throws Exception
+ {
+ ByteBuffer result = (ByteBuffer) message;
+ _totalBytesReceived += result.remaining();
+ int size = result.remaining();
+ long now = System.currentTimeMillis();
+ if (_partialBytesRead > 0)
+ {
+ int offset = _chunkSize - _partialBytesRead;
+ if (size >= offset)
+ {
+ _chunkTimes[_chunksReceived++] = now;
+ result.position(offset);
+ }
+ else
+ {
+ // have not read even one chunk, including the previous partial bytes
+ _partialBytesRead += size;
+ return;
+ }
+ }
+
+ int chunkCount = result.remaining()/_chunkSize;
+
+ for (int i = 0; i < chunkCount; i++)
+ {
+ _chunkTimes[_chunksReceived++] = now;
+ byte check = result.get();
+ _logger.debug("Check number " + check + " read");
+ if (check != (byte)((_chunksReceived - 1)%128))
+ {
+ _logger.error("Check number " + check + " read when expected " + (_chunksReceived%128));
+ }
+ _logger.debug("Chunk times recorded");
+
+ try
+ {
+ result.skip(_chunkSize - 1);
+ }
+ catch (IllegalArgumentException e)
+ {
+ _logger.error("Position was: " + result.position());
+ _logger.error("Tried to skip to: " + (_chunkSize * i));
+ _logger.error("limit was; " + result.limit());
+ }
+ }
+ _logger.debug("Chunks received now " + _chunksReceived);
+ _logger.debug("Bytes received: " + _totalBytesReceived);
+ _partialBytesRead = result.remaining();
+
+ if (_partialBytesRead > 0)
+ {
+ _partialCheckNumber = result.get();
+ }
+
+ if (_chunksReceived >= _chunkCount)
+ {
+ _notifier.countDown();
+ }
+
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ _logger.error("Error: " + cause, cause);
+ }
+ }
+
+ public void startWriter(int chunkSize) throws IOException, InterruptedException
+ {
+ _chunkSize = chunkSize;
+
+ IoConnector ioConnector = null;
+
+ ioConnector = new SocketConnector();
+
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ cfg.setThreadModel(ThreadModel.MANUAL);
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(true);
+ scfg.setSendBufferSize(32768);
+ scfg.setReceiveBufferSize(32768);
+
+ final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT);
+ _logger.info("Attempting connection to " + address);
+ ConnectFuture future = ioConnector.connect(address, new WriterHandler());
+ // wait for connection to complete
+ future.join();
+ _logger.info("Connection completed");
+ // we call getSession which throws an IOException if there has been an error connecting
+ _session = future.getSession();
+ _chunkTimes = new long[_chunkCount];
+ Thread t = new Thread(this);
+ t.start();
+ t.join();
+ _logger.info("Test completed");
+ }
+ }
+
+ private RunnableWriterTest _runnableWriterTest = new RunnableWriterTest(_logger);
+
+ public void test1k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 1k test");
+ _runnableWriterTest.startWriter(1024);
+ }
+
+ public void test2k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 2k test");
+ _runnableWriterTest.startWriter(2048);
+ }
+
+ public void test4k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 4k test");
+ _runnableWriterTest.startWriter(4096);
+ }
+
+ public void test8k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 8k test");
+ _runnableWriterTest.startWriter(8192);
+ }
+
+ public void test16k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 16k test");
+ _runnableWriterTest.startWriter(16384);
+ }
+
+ public void test32k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 32k test");
+ _runnableWriterTest.startWriter(32768);
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException
+ {
+ WriterTest w = new WriterTest();
+ //w.test1k();
+ //w.test2k();
+ //w.test4k();
+ w.test8k();
+ //w.test16k();
+ //w.test32k();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(WriterTest.class);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
new file mode 100644
index 0000000000..db02b9954a
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
@@ -0,0 +1,269 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.multiconsumer;
+
+import java.io.ByteArrayOutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Session;
+
+/**
+ * Test AMQ.
+ */
+public class AMQTest extends TestCase implements ExceptionListener
+{
+
+ private final static String COMPRESSION_PROPNAME = "_MSGAPI_COMP";
+ private final static String UTF8 = "UTF-8";
+ private static final String SUBJECT = "test.amq";
+ private static final String DUMMYCONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ private static final String HUGECONTENT;
+
+ private AMQConnection connect = null;
+ private Session pubSession = null;
+ private Session subSession = null;
+ private Topic topic = null;
+
+ static
+ {
+ StringBuilder sb = new StringBuilder(DUMMYCONTENT.length() * 115);
+ for (int i = 0; i < 100; i++)
+ {
+ sb.append(DUMMYCONTENT);
+ }
+ HUGECONTENT = sb.toString();
+ }
+
+ private void setup() throws Exception
+ {
+ connect = new AMQConnection("localhost", 5672, "guest", "guest", "client1", "/");
+ connect.setExceptionListener(this);
+ pubSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ subSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ topic = new AMQTopic(pubSession.getDefaultTopicExchangeName(), new AMQShortString(SUBJECT));
+
+ connect.start();
+ }
+
+ public void testMultipleListeners() throws Exception
+ {
+ setup();
+ try
+ {
+ // Create 5 listeners
+ MsgHandler[] listeners = new MsgHandler[5];
+ for (int i = 0; i < listeners.length; i++)
+ {
+ listeners[i] = new MsgHandler();
+ MessageConsumer subscriber = subSession.createConsumer(topic);
+ subscriber.setMessageListener(listeners[i]);
+ }
+ MessageProducer publisher = pubSession.createProducer(topic);
+ // Send a single message
+ TextMessage msg = pubSession.createTextMessage();
+ msg.setText(DUMMYCONTENT);
+ publisher.send(msg);
+ Thread.sleep(5000);
+ // Check listeners to ensure they all got it
+ for (int i = 0; i < listeners.length; i++)
+ {
+ if (listeners[i].isGotIt())
+ {
+ System.out.println("Got callback for listener " + i);
+ }
+ else
+ {
+ TestCase.fail("Listener " + i + " did not get callback");
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ System.err.println("Error: " + e);
+ e.printStackTrace(System.err);
+ }
+ finally
+ {
+ close();
+ }
+ }
+
+ public void testCompression() throws Exception
+ {
+ setup();
+ String comp = this.compressString(HUGECONTENT);
+ try
+ {
+ MsgHandler listener = new MsgHandler();
+ MessageConsumer subscriber = subSession.createConsumer(topic);
+ subscriber.setMessageListener(listener);
+ MessageProducer publisher = pubSession.createProducer(topic);
+
+ // Send a single message
+ TextMessage msg = pubSession.createTextMessage();
+ // Set the compressed text
+ msg.setText(comp);
+ msg.setBooleanProperty(COMPRESSION_PROPNAME, true);
+ publisher.send(msg);
+ Thread.sleep(1000);
+ // Check listeners to ensure we got it
+ if (listener.isGotIt())
+ {
+ System.out.println("Got callback for listener");
+ }
+ else
+ {
+ TestCase.fail("Listener did not get callback");
+ }
+ }
+ finally
+ {
+ close();
+ }
+ }
+
+ private void close() throws Exception
+ {
+ if (connect != null)
+ {
+ connect.close();
+ }
+ }
+
+ private class MsgHandler implements MessageListener
+ {
+ private boolean gotIt = false;
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ TextMessage textMessage = (TextMessage) msg;
+ String string = textMessage.getText();
+ if (string != null && string.length() > 0)
+ {
+ gotIt = true;
+ }
+ if (textMessage.getBooleanProperty(COMPRESSION_PROPNAME))
+ {
+ string = inflateString(string);
+ }
+ System.out.println("Got callback of size " + (string==null?0:string.length()));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public boolean isGotIt()
+ {
+ return this.gotIt;
+ }
+ }
+
+ private String compressString(String string) throws Exception
+ {
+ long start = System.currentTimeMillis();
+ byte[] input = string.getBytes();
+ Deflater compressor = new Deflater(Deflater.BEST_COMPRESSION);
+ compressor.setInput(input);
+ compressor.finish();
+
+ // Get byte array from output of compressor
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(input.length);
+ byte[] buf = new byte[1024];
+ while (!compressor.finished())
+ {
+ int cnt = compressor.deflate(buf);
+ baos.write(buf, 0, cnt);
+ }
+ baos.close();
+ byte[] output = baos.toByteArray();
+
+ // Convert byte array into String
+ byte[] base64 = Base64.encodeBase64(output);
+ String sComp = new String(base64, UTF8);
+
+ long diff = System.currentTimeMillis() - start;
+ System.out.println("Compressed text from " + input.length + " to "
+ + sComp.getBytes().length + " in " + diff + " ms");
+ System.out.println("Compressed text = '" + sComp + "'");
+
+ return sComp;
+ }
+
+ private String inflateString(String string) throws Exception
+ {
+ byte[] input = string.getBytes();
+
+ // First convert Base64 string back to binary array
+ byte[] bytes = Base64.decodeBase64(input);
+
+ // Set string as input data for decompressor
+ Inflater decompressor = new Inflater();
+ decompressor.setInput(bytes);
+
+ // Decompress the data
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);
+ byte[] buf = new byte[1024];
+ while (!decompressor.finished())
+ {
+ int count = decompressor.inflate(buf);
+ bos.write(buf, 0, count);
+ }
+ bos.close();
+ byte[] output = bos.toByteArray();
+
+ // Get the decompressed data
+ return new String(output, UTF8);
+ }
+
+ /**
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace(System.err);
+ }
+
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java b/qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
new file mode 100644
index 0000000000..37b4ff1498
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.pubsub1;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ *
+ */
+public class TestPublisher
+{
+ private static final Logger _log = Logger.getLogger(TestPublisher.class);
+
+ private AMQConnection _connection;
+
+ private Session _session;
+
+ private class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ _actualMessageCount++;
+ if (_actualMessageCount%1000 == 0)
+ {
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+ /*if (!"henson".equals(m.toString()))
+ {
+ _log.error("AbstractJMSMessage response not correct: expected 'henson' but got " + m.toString());
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("AbstractJMSMessage " + m + " received");
+ }
+ else
+ {
+ _log.info("AbstractJMSMessage received");
+ }
+ } */
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
+ }
+ }
+ }
+
+ public TestPublisher(String host, int port, String clientID, String commandQueueName,
+ final int messageCount) throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ createConnection(host, port, clientID);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic destination = new AMQTopic(_session.getDefaultTopicExchangeName(), new AMQShortString(commandQueueName));
+ MessageProducer producer = (MessageProducer) _session.createProducer(destination);
+
+ _connection.start();
+ //TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
+ final long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ TextMessage msg = _session.createTextMessage(destination.getTopicName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths: " + i);
+
+ //msg.setIntProperty("a",i % 2);
+ //msg.setIntProperty("b",i % 4);
+
+ producer.send(msg);
+ }
+ _log.info("Finished sending " + messageCount + " messages");
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void createConnection(String host, int port, String clientID) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(host, port, "guest", "guest",
+ clientID, "/test");
+ }
+
+ /**
+ *
+ * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+ * means the server will allocate a name.
+ */
+ public static void main(String[] args) throws URLSyntaxException
+ {
+ if (args.length == 0)
+ {
+ System.err.println("Usage: TestPublisher <host> <port> <command queue name> <number of messages>");
+ }
+ try
+ {
+ int port = Integer.parseInt(args[1]);
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+ TestPublisher client = new TestPublisher(args[0], port, clientID, args[2], Integer.parseInt(args[3]));
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ catch (AMQException e)
+ {
+ System.err.println("Error in client: " + e);
+ e.printStackTrace();
+ }
+
+ //System.exit(0);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java b/qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java
new file mode 100644
index 0000000000..450d9b3914
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestSubscriber.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.pubsub1;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+import java.net.InetAddress;
+
+public class TestSubscriber
+{
+ private static final Logger _logger = Logger.getLogger(TestSubscriber.class);
+
+ private static class TestMessageListener implements MessageListener
+ {
+ private String _name;
+
+ private int _expectedMessageCount;
+
+ private int _messageCount;
+
+ private long _startTime = 0;
+
+ public TestMessageListener(String name, int expectedMessageCount)
+ {
+ _name = name;
+ _expectedMessageCount = expectedMessageCount;
+ }
+
+ public void onMessage(javax.jms.Message message)
+ {
+ if (_messageCount++ == 0)
+ {
+ _startTime = System.currentTimeMillis();
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info(_name + " got message '" + message + "'");
+ }
+ if (_messageCount == _expectedMessageCount)
+ {
+ long totalTime = System.currentTimeMillis() - _startTime;
+ _logger.error(_name + ": Total time to receive " + _messageCount + " messages was " +
+ totalTime + "ms. Rate is " + (_messageCount/(totalTime/1000.0)));
+ }
+ if (_messageCount > _expectedMessageCount)
+ {
+ _logger.error("Oops! More messages received than expected (" + _messageCount + ")");
+ }
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length != 7)
+ {
+ System.out.println("Usage: host port username password virtual-path expectedMessageCount selector");
+ System.exit(1);
+ }
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ AMQConnection con1 = new AMQConnection(args[0], Integer.parseInt(args[1]), args[2], args[3],
+ address.getHostName(), args[4]);
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ AMQConnection con2 = new AMQConnection(args[0], Integer.parseInt(args[1]), args[2], args[3],
+ address.getHostName(), args[4]);
+ final Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String selector = args[6];
+
+ final int expectedMessageCount = Integer.parseInt(args[5]);
+ _logger.info("Message selector is <" + selector + ">...");
+
+ Topic t = new AMQTopic(session1.getDefaultTopicExchangeName(), new AMQShortString("cbr"));
+ MessageConsumer consumer1 = session1.createConsumer(t,
+ 100, false, false, selector);
+ MessageConsumer consumer2 = session2.createConsumer(t,
+ 100, false, false, selector);
+
+ consumer1.setMessageListener(new TestMessageListener("ML 1", expectedMessageCount));
+ consumer2.setMessageListener(new TestMessageListener("ML 2", expectedMessageCount));
+ con1.start();
+ con2.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ System.out.println("Waiting...");
+ }
+}
+
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java
new file mode 100644
index 0000000000..f59b36166a
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/client/connection/TestManyConnections.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.log4j.Logger;
+
+import junit.framework.TestCase;
+
+public class TestManyConnections extends TestCase
+{
+ private static final Logger _log = Logger.getLogger(TestManyConnections.class);
+
+ private AMQConnection[] _connections;
+
+ private void createConnection(int index, String brokerHosts, String clientID, String username, String password,
+ String vpath) throws AMQException, URLSyntaxException
+ {
+ _connections[index] = new AMQConnection(brokerHosts, username, password,
+ clientID, vpath);
+ }
+
+ private void createConnections(int count) throws AMQException, URLSyntaxException
+ {
+ _connections = new AMQConnection[count];
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < count; i++)
+ {
+ createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "test");
+ }
+ long endTime = System.currentTimeMillis();
+ _log.info("Time to create " + count + " connections: " + (endTime - startTime) +
+ "ms");
+ }
+
+ public void testCreate10Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(10);
+ }
+
+ public void testCreate50Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(50);
+ }
+
+ public void testCreate100Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(100);
+ }
+
+ public void testCreate250Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(250);
+ }
+
+ public void testCreate500Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(500);
+ }
+
+ public void testCreate1000Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(1000);
+ }
+
+ public void testCreate5000Connections() throws AMQException, URLSyntaxException
+ {
+ createConnections(5000);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestManyConnections.class);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
new file mode 100644
index 0000000000..5ab5722146
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.jndi;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Properties;
+import java.io.InputStream;
+
+
+import junit.framework.TestCase;
+
+public class PropertiesFileInitialContextFactoryTest extends TestCase
+{
+ InitialContextFactory contextFactory;
+ Properties _properties;
+ Properties _fileProperties;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //create simple set of hardcoded props
+ _properties = new Properties();
+ _properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ _properties.put("connectionfactory.local", "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'");
+ _properties.put("queue.MyQueue", "example.MyQueue");
+ _properties.put("topic.ibmStocks", "stocks.nyse.ibm");
+ _properties.put("destination.direct", "direct://amq.direct//directQueue");
+
+ //create properties from file as a more realistic test
+ _fileProperties = new Properties();
+ ClassLoader cl = this.getClass().getClassLoader();
+ InputStream is = cl.getResourceAsStream("org/apache/qpid/test/unit/jndi/example.properties");
+ _fileProperties.load(is);
+ }
+
+ /**
+ * Test using hardcoded properties
+ */
+ public void testWithoutFile()
+ {
+ Context ctx = null;
+
+ try
+ {
+ ctx = new InitialContext(_properties);
+ }
+ catch (NamingException ne)
+ {
+ fail("Error loading context:" + ne);
+ }
+
+ checkPropertiesMatch(ctx, "Using hardcoded properties: ");
+ }
+
+ /**
+ * Test using properties from example file
+ */
+ public void testWithFile()
+ {
+ Context ctx = null;
+
+ try
+ {
+ ctx = new InitialContext(_fileProperties);
+ }
+ catch (Exception e)
+ {
+ fail("Error loading context:" + e);
+ }
+
+ checkPropertiesMatch(ctx, "Using properties from file: ");
+ }
+
+ public void tearDown()
+ {
+ _properties = null;
+ _fileProperties = null;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(PropertiesFileInitialContextFactoryTest.class);
+ }
+
+ private void checkPropertiesMatch(Context ctx, String errorInfo)
+ {
+ try
+ {
+ AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
+ assertEquals("amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'", cf.getConnectionURL().toString());
+ }
+ catch (NamingException ne)
+ {
+ fail(errorInfo + "Unable to create Connection Factory:" + ne);
+ }
+
+ try
+ {
+ AMQQueue queue = (AMQQueue) ctx.lookup("MyQueue");
+ assertEquals("example.MyQueue", queue.getRoutingKey().toString());
+ }
+ catch (NamingException ne)
+ {
+ fail(errorInfo + "Unable to create queue:" + ne);
+ }
+
+ try
+ {
+ AMQTopic topic = (AMQTopic) ctx.lookup("ibmStocks");
+ assertEquals("stocks.nyse.ibm", topic.getTopicName().toString());
+ }
+ catch (Exception ne)
+ {
+ fail(errorInfo + "Unable to create topic:" + ne);
+ }
+
+ try
+ {
+ AMQQueue direct = (AMQQueue) ctx.lookup("direct");
+ assertEquals("directQueue", direct.getRoutingKey().toString());
+ }
+ catch (NamingException ne)
+ {
+ fail(errorInfo + "Unable to create direct destination:" + ne);
+ }
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/example.properties b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/example.properties
new file mode 100644
index 0000000000..ea9dc5ae0e
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/example.properties
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'
+
+# register some queues in JNDI using the form
+# queue.[jndiName] = [physicalName]
+queue.MyQueue = example.MyQueue
+
+# register some topics in JNDI using the form
+# topic.[jndiName] = [physicalName]
+topic.ibmStocks = stocks.nyse.ibm
+
+# Register an AMQP destination in JNDI
+# NOTE: Qpid currently only supports direct,topics and headers
+# destination.[jniName] = [BindingURL]
+destination.direct = direct://amq.direct//directQueue
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Config.java b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Config.java
new file mode 100644
index 0000000000..bb740f9094
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Config.java
@@ -0,0 +1,243 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.Connector;
+import org.apache.qpid.config.AbstractConfig;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+class Config extends AbstractConfig implements ConnectorConfig
+{
+
+ private String host = "localhost";
+ private int port = 5672;
+ private String factory = null;
+
+ private int payload = 256;
+ private int messages = 1000;
+ private int clients = 1;
+ private int batch = 1;
+ private long delay = 1;
+ private int warmup;
+ private int ackMode= AMQSession.NO_ACKNOWLEDGE;
+ private String clientId;
+ private String subscriptionId;
+ private boolean persistent;
+
+ public Config()
+ {
+ }
+
+ int getAckMode()
+ {
+ return ackMode;
+ }
+
+ void setPayload(int payload)
+ {
+ this.payload = payload;
+ }
+
+ int getPayload()
+ {
+ return payload;
+ }
+
+ void setClients(int clients)
+ {
+ this.clients = clients;
+ }
+
+ int getClients()
+ {
+ return clients;
+ }
+
+ void setMessages(int messages)
+ {
+ this.messages = messages;
+ }
+
+ int getMessages()
+ {
+ return messages;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public String getFactory()
+ {
+ return factory;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ int getBatch()
+ {
+ return batch;
+ }
+
+ void setBatch(int batch)
+ {
+ this.batch = batch;
+ }
+
+ int getWarmup()
+ {
+ return warmup;
+ }
+
+ void setWarmup(int warmup)
+ {
+ this.warmup = warmup;
+ }
+
+ public long getDelay()
+ {
+ return delay;
+ }
+
+ public void setDelay(long delay)
+ {
+ this.delay = delay;
+ }
+
+ String getClientId()
+ {
+ return clientId;
+ }
+
+ String getSubscriptionId()
+ {
+ return subscriptionId;
+ }
+
+ boolean usePersistentMessages()
+ {
+ return persistent;
+ }
+
+ public void setOption(String key, String value)
+ {
+ if("-host".equalsIgnoreCase(key))
+ {
+ setHost(value);
+ }
+ else if("-port".equalsIgnoreCase(key))
+ {
+ try
+ {
+ setPort(Integer.parseInt(value));
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException("Bad port number: " + value);
+ }
+ }
+ else if("-payload".equalsIgnoreCase(key))
+ {
+ setPayload(parseInt("Bad payload size", value));
+ }
+ else if("-messages".equalsIgnoreCase(key))
+ {
+ setMessages(parseInt("Bad message count", value));
+ }
+ else if("-clients".equalsIgnoreCase(key))
+ {
+ setClients(parseInt("Bad client count", value));
+ }
+ else if("-batch".equalsIgnoreCase(key))
+ {
+ setBatch(parseInt("Bad batch count", value));
+ }
+ else if("-delay".equalsIgnoreCase(key))
+ {
+ setDelay(parseLong("Bad batch delay", value));
+ }
+ else if("-warmup".equalsIgnoreCase(key))
+ {
+ setWarmup(parseInt("Bad warmup count", value));
+ }
+ else if("-ack".equalsIgnoreCase(key))
+ {
+ ackMode = parseInt("Bad ack mode", value);
+ }
+ else if("-factory".equalsIgnoreCase(key))
+ {
+ factory = value;
+ }
+ else if("-clientId".equalsIgnoreCase(key))
+ {
+ clientId = value;
+ }
+ else if("-subscriptionId".equalsIgnoreCase(key))
+ {
+ subscriptionId = value;
+ }
+ else if("-persistent".equalsIgnoreCase(key))
+ {
+ persistent = "true".equalsIgnoreCase(value);
+ }
+ else
+ {
+ System.out.println("Ignoring unrecognised option: " + key);
+ }
+ }
+
+ static String getAckModeDescription(int ackMode)
+ {
+ switch(ackMode)
+ {
+ case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE";
+ case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE";
+ case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE";
+ case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE";
+ case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE";
+ }
+ return "AckMode=" + ackMode;
+ }
+
+ public Connection createConnection() throws Exception
+ {
+ return new Connector().createConnection(this);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Listener.java b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Listener.java
new file mode 100644
index 0000000000..47c608cfe4
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Listener.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class Listener implements MessageListener
+{
+ private final Connection _connection;
+ private final MessageProducer _controller;
+ private final javax.jms.Session _session;
+ private final MessageFactory _factory;
+ private boolean init;
+ private int count;
+ private long start;
+
+ Listener(Connection connection, int ackMode) throws Exception
+ {
+ this(connection, ackMode, null);
+ }
+
+ Listener(Connection connection, int ackMode, String name) throws Exception
+ {
+ _connection = connection;
+ _session = connection.createSession(false, ackMode);
+ _factory = new MessageFactory(_session);
+
+ //register for events
+ if(name == null)
+ {
+ _factory.createTopicConsumer().setMessageListener(this);
+ }
+ else
+ {
+ _factory.createDurableTopicConsumer(name).setMessageListener(this);
+ }
+
+ _connection.start();
+
+ _controller = _factory.createControlPublisher();
+ System.out.println("Waiting for messages " +
+ Config.getAckModeDescription(ackMode)
+ + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
+ + "...");
+
+ }
+
+ private void shutdown()
+ {
+ try
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ try
+ {
+ String msg = getReport();
+ _controller.send(_factory.createReportResponseMessage(msg));
+ System.out.println("Sent report: " + msg);
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport()
+ {
+ long time = (System.currentTimeMillis() - start);
+ return "Received " + count + " in " + time + "ms";
+ }
+
+ public void onMessage(Message message)
+ {
+ if(!init)
+ {
+ start = System.currentTimeMillis();
+ count = 0;
+ init = true;
+ }
+
+ if(_factory.isShutdown(message))
+ {
+ shutdown();
+ }
+ else if(_factory.isReport(message))
+ {
+ //send a report:
+ report();
+ init = false;
+ }
+ else if (++count % 100 == 0)
+ {
+ System.out.println("Received " + count + " messages.");
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setOptions(argv);
+
+ Connection con = config.createConnection();
+ if(config.getClientId() != null)
+ {
+ con.setClientID(config.getClientId());
+ }
+ new Listener(con, config.getAckMode(), config.getSubscriptionId());
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java
new file mode 100644
index 0000000000..39d64069d1
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/MessageFactory.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.*;
+
+/**
+ */
+class MessageFactory
+{
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+ private final Session _session;
+ private final Topic _topic;
+ private final Topic _control;
+ private final byte[] _payload;
+
+
+ MessageFactory(Session session) throws JMSException
+ {
+ this(session, 256);
+ }
+
+ MessageFactory(Session session, int size) throws JMSException
+ {
+ _session = session;
+ if(session instanceof AMQSession)
+ {
+ _topic = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(),new AMQShortString("topictest.messages"));
+ _control = new AMQTopic(((AMQSession)session).getDefaultTopicExchangeName(),new AMQShortString("topictest.control"));
+ }
+ else
+ {
+ _topic = session.createTopic("topictest.messages");
+ _control = session.createTopic("topictest.control");
+ }
+ _payload = new byte[size];
+
+ for(int i = 0; i < size; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
+ }
+
+ Topic getTopic()
+ {
+ return _topic;
+ }
+
+ Message createEventMessage() throws JMSException
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ msg.writeBytes(_payload);
+ return msg;
+ }
+
+ Message createShutdownMessage() throws JMSException
+ {
+ return _session.createTextMessage("SHUTDOWN");
+ }
+
+ Message createReportRequestMessage() throws JMSException
+ {
+ return _session.createTextMessage("REPORT");
+ }
+
+ Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ boolean isShutdown(Message m)
+ {
+ return checkText(m, "SHUTDOWN");
+ }
+
+ boolean isReport(Message m)
+ {
+ return checkText(m, "REPORT");
+ }
+
+ Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return e.toString();
+ }
+ }
+
+ MessageConsumer createTopicConsumer() throws Exception
+ {
+ return _session.createConsumer(_topic);
+ }
+
+ MessageConsumer createDurableTopicConsumer(String name) throws Exception
+ {
+ return _session.createDurableSubscriber(_topic, name);
+ }
+
+ MessageConsumer createControlConsumer() throws Exception
+ {
+ return _session.createConsumer(_control);
+ }
+
+ MessageProducer createTopicPublisher() throws Exception
+ {
+ return _session.createProducer(_topic);
+ }
+
+ MessageProducer createControlPublisher() throws Exception
+ {
+ return _session.createProducer(_control);
+ }
+
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return false;
+ }
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Publisher.java b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Publisher.java
new file mode 100644
index 0000000000..d788029ee9
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/topic/Publisher.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import javax.jms.*;
+
+public class Publisher implements MessageListener
+{
+ private final Object _lock = new Object();
+ private final Connection _connection;
+ private final Session _session;
+ private final MessageFactory _factory;
+ private final MessageProducer _publisher;
+ private int _count;
+
+ Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception
+ {
+ _connection = connection;
+ _session = _connection.createSession(false, ackMode);
+ _factory = new MessageFactory(_session, size);
+ _publisher = _factory.createTopicPublisher();
+ _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + ".");
+ }
+
+ private void test(Config config) throws Exception
+ {
+ test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
+ }
+
+ private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception
+ {
+ _factory.createControlConsumer().setMessageListener(this);
+ _connection.start();
+
+ if(warmup > 0)
+ {
+ System.out.println("Runing warmup (" + warmup + " msgs)");
+ long time = batch(warmup, consumerCount);
+ System.out.println("Warmup completed in " + time + "ms");
+ }
+
+ long[] times = new long[batches];
+ for(int i = 0; i < batches; i++)
+ {
+ if(i > 0) Thread.sleep(delay*1000);
+ times[i] = batch(msgCount, consumerCount);
+ System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
+ }
+
+ long min = min(times);
+ long max = max(times);
+ System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
+
+ //request shutdown
+ _publisher.send(_factory.createShutdownMessage());
+
+ _connection.stop();
+ _connection.close();
+ }
+
+ private long batch(int msgCount, int consumerCount) throws Exception
+ {
+ _count = consumerCount;
+ long start = System.currentTimeMillis();
+ publish(msgCount);
+ waitForCompletion(consumerCount);
+ return System.currentTimeMillis() - start;
+ }
+
+ private void publish(int count) throws Exception
+ {
+
+ //send events
+ for (int i = 0; i < count; i++)
+ {
+ _publisher.send(_factory.createEventMessage());
+ if ((i + 1) % 100 == 0)
+ {
+ System.out.println("Sent " + (i + 1) + " messages");
+ }
+ }
+
+ //request report
+ _publisher.send(_factory.createReportRequestMessage());
+ }
+
+ private void waitForCompletion(int consumers) throws Exception
+ {
+ System.out.println("Waiting for completion...");
+ synchronized (_lock)
+ {
+ while (_count > 0)
+ {
+ _lock.wait();
+ }
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
+ if (_count == 0)
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+ }
+
+ static long min(long[] times)
+ {
+ long min = times.length > 0 ? times[0] : 0;
+ for(int i = 0; i < times.length; i++)
+ {
+ min = Math.min(min, times[i]);
+ }
+ return min;
+ }
+
+ static long max(long[] times)
+ {
+ long max = times.length > 0 ? times[0] : 0;
+ for(int i = 0; i < times.length; i++)
+ {
+ max = Math.max(max, times[i]);
+ }
+ return max;
+ }
+
+ static long avg(long[] times, long min, long max)
+ {
+ long sum = 0;
+ for(int i = 0; i < times.length; i++)
+ {
+ sum += times[i];
+ }
+ sum -= min;
+ sum -= max;
+
+ return (sum / (times.length - 2));
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setOptions(argv);
+
+ Connection con = config.createConnection();
+ int size = config.getPayload();
+ int ackMode = config.getAckMode();
+ boolean persistent = config.usePersistentMessages();
+ new Publisher(con, size, ackMode, persistent).test(config);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
new file mode 100644
index 0000000000..bd104e5407
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.config.AbstractConfig;
+import org.apache.qpid.config.Connector;
+
+import javax.jms.Connection;
+
+class Config extends AbstractConfig implements ConnectorConfig
+{
+ private String host = "localhost";
+ private int port = 5672;
+ private String factory;
+ private boolean echo;
+ private int batch = 100;
+ private boolean persistent = true;
+
+ Config(String[] argv)
+ {
+ setOptions(argv);
+ }
+
+ Connection createConnection() throws Exception
+ {
+ return new Connector().createConnection(this);
+ }
+
+ public boolean isEchoOn()
+ {
+ return echo;
+ }
+
+ public boolean usePersistentMessages()
+ {
+ return persistent;
+ }
+
+ public int getBatchSize()
+ {
+ return batch;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public String getFactory()
+ {
+ return factory;
+ }
+
+ public void setOption(String key, String value)
+ {
+ if("-host".equalsIgnoreCase(key))
+ {
+ host = value;
+ }
+ else if("-port".equalsIgnoreCase(key))
+ {
+ port = parseInt("Bad port number", value);
+ }
+ else if("-factory".equalsIgnoreCase(key))
+ {
+ factory = value;
+ }
+ else if("-echo".equalsIgnoreCase(key))
+ {
+ echo = "true".equalsIgnoreCase(value);
+ }
+ else if("-persistent".equalsIgnoreCase(key))
+ {
+ persistent = "true".equalsIgnoreCase(value);
+ }
+ else if("-batch".equalsIgnoreCase(key))
+ {
+ batch = parseInt("Bad batch size", value);
+ }
+ else
+ {
+ System.out.println("Ignoring nrecognised option " + key);
+ }
+ }
+
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
new file mode 100644
index 0000000000..8f15bf089e
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import java.util.Arrays;
+
+public class Ping
+{
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config(argv);
+ Connection con = config.createConnection();
+ con.setClientID("ping");
+ new Relay(new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")), new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("pong")), con,
+ config.isEchoOn(),
+ config.getBatchSize(),
+ config.usePersistentMessages()).start();
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
new file mode 100644
index 0000000000..f4f4b20d7c
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+public class Pong
+{
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config(argv);
+ Connection con = config.createConnection();
+ con.setClientID("pong");
+ new Relay(new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("pong")), new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping")), con,
+ config.isEchoOn(),
+ config.getBatchSize(),
+ config.usePersistentMessages()).start();
+
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
new file mode 100644
index 0000000000..cede95e5f0
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.DeliveryMode;
+
+class Relay implements Runnable
+{
+ private final Connection _con;
+ private final Session _session;
+ private final MessageConsumer _src;
+ private final MessageProducer _dest;
+ private final int _batch;
+ private final boolean _echo;
+ private int _counter;
+ private long start;
+ private boolean _running;
+
+ Relay(Destination src, Destination dest, Connection con) throws JMSException
+ {
+ this(src, dest, con, false, 100, true);
+ }
+
+ Relay(Destination src, Destination dest, Connection con, boolean echo, int batch, boolean persistent) throws JMSException
+ {
+ _echo = echo;
+ _batch = batch;
+ _con = con;
+ _session = con.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+ _src = _session.createConsumer(src);
+ _dest = _session.createProducer(dest);
+ _dest.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ }
+
+ public void run()
+ {
+ start = System.currentTimeMillis();
+ try{
+ while(true) relay();
+ }
+ catch(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ _session.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ void relay() throws JMSException
+ {
+ _dest.send(relay(_src.receive()));
+ _session.commit();
+ }
+
+ Message relay(Message in) throws JMSException
+ {
+ if(!_running)
+ {
+ System.out.println(_con.getClientID() + " started.");
+ _running = true;
+ }
+ if(++_counter % _batch == 0)
+ {
+ long time = System.currentTimeMillis() - start;
+ System.out.println(_batch + " iterations performed in " + time + " ms");
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ start = System.currentTimeMillis();
+ }
+ if(_echo)
+ {
+ System.out.println("Received: " + ((TextMessage) in).getText());
+ }
+ return _session.createTextMessage(_con.getClientID() + _counter);
+ }
+
+ void start() throws InterruptedException, JMSException
+ {
+ Thread runner = new Thread(this);
+ runner.start();
+ _con.start();
+ System.out.println(_con.getClientID() + " waiting...");
+ runner.join();
+ _con.close();
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
new file mode 100644
index 0000000000..de718d828a
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class Start
+{
+ public static void main(String[] argv) throws Exception
+ {
+ Connection con = new Config(argv).createConnection();
+ AMQQueue ping = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, new AMQShortString("ping"));
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createProducer(ping).send(session.createTextMessage("start"));
+ session.close();
+ con.close();
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java b/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
new file mode 100644
index 0000000000..71d806b338
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.weblogic;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import java.net.InetAddress;
+import java.util.Hashtable;
+
+public class ServiceProvider
+{
+ private static final String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
+ private static final String JMS_FACTORY = "transientJMSConnectionFactory";
+
+ private static final Logger _logger = Logger.getLogger(ServiceProvider.class);
+
+ private static MessageProducer _destinationProducer;
+
+ private static Queue _destinationQ;
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length != 2)
+ {
+ System.out.println("Usage: <WLS URI> <service queue>");
+ System.exit(1);
+ }
+ try
+ {
+ String url = args[0];
+ String receiveQueue = args[1];
+
+ final InitialContext ctx = getInitialContext(url);
+
+ QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
+ QueueConnection qcon = qconFactory.createQueueConnection();
+ final QueueSession qsession = qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue receiveQ = (Queue) ctx.lookup(receiveQueue);
+
+ _logger.info("Service (queue) name is '" + receiveQ + "'...");
+
+ String selector = (args.length > 2 && args[2] != null && args[2].length() > 1) ? args[2] : null;
+
+ _logger.info("Message selector is <" + selector + ">...");
+
+ MessageConsumer consumer = qsession.createConsumer(receiveQ, selector);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ public void onMessage(javax.jms.Message message)
+ {
+ //_logger.info("Got message '" + message + "'");
+
+ TextMessage tm = (TextMessage) message;
+
+ try
+ {
+ Queue responseQueue = (Queue)tm.getJMSReplyTo();
+ if (!responseQueue.equals(_destinationQ))
+ {
+ _destinationQ = responseQueue;
+ _logger.info("Creating destination for " + responseQueue);
+
+ try
+ {
+ _destinationProducer = qsession.createProducer(_destinationQ);
+ _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+ catch (JMSException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ _messageCount++;
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Received message total: " + _messageCount);
+ _logger.info("Sending response to '" + responseQueue + "'");
+ }
+
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+ TextMessage msg = qsession.createTextMessage(payload);
+ if (tm.propertyExists("timeSent"))
+ {
+ _logger.info("timeSent property set on message");
+ final long timeSent = tm.getLongProperty("timeSent");
+ msg.setLongProperty("timeSent", timeSent);
+ _logger.info("time taken to go from service request to provider is: " + (System.currentTimeMillis() - timeSent));
+ }
+ _destinationProducer.send(msg);
+ if (_messageCount % 1000 == 0)
+ {
+ tm.acknowledge();
+ _logger.info("Sent response to '" + responseQueue + "'");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error sending message: " + e, e);
+ }
+ }
+ });
+ qcon.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+
+ System.out.println("Waiting...");
+ }
+
+ private static InitialContext getInitialContext(String url) throws NamingException
+ {
+ Hashtable env = new Hashtable();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
+ env.put(Context.PROVIDER_URL, url);
+ return new InitialContext(env);
+ }
+}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java b/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
new file mode 100644
index 0000000000..2f64a1dde5
--- /dev/null
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.weblogic;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Hashtable;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: U806869
+ * Date: 28-May-2005
+ * Time: 21:54:51
+ * To change this template use File | Settings | File Templates.
+ */
+public class ServiceRequestingClient
+{
+ private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
+ private static final String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
+ private static final String JMS_FACTORY = "transientJMSConnectionFactory";
+
+ private static class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ private long _averageLatency;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ try
+ {
+ if (m.propertyExists("timeSent"))
+ {
+ long timeSent = m.getLongProperty("timeSent");
+ long now = System.currentTimeMillis();
+ if (_averageLatency == 0)
+ {
+ _averageLatency = now - timeSent;
+ _log.info("Latency " + _averageLatency);
+ }
+ else
+ {
+ _log.info("Individual latency: " + (now-timeSent));
+ _averageLatency = (_averageLatency + (now - timeSent))/2;
+ _log.info("Average latency now: " + _averageLatency);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _log.error("Could not calculate latency");
+ }
+
+ _actualMessageCount++;
+ if (_actualMessageCount%1000 == 0)
+ {
+ try
+ {
+ m.acknowledge();
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error acknowledging message");
+ }
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+ /*if (!"henson".equals(m.toString()))
+ {
+ _log.error("Message response not correct: expected 'henson' but got " + m.toString());
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message " + m + " received");
+ }
+ else
+ {
+ _log.info("Message received");
+ }
+ } */
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
+ System.out.println("Average latency is: " + _averageLatency);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 3)
+ {
+ System.out.println("Usage: IXPublisher <WLS URL> <sendQueue> <count> will publish count messages to ");
+ System.out.println("queue sendQueue and waits for a response on a temp queue");
+ System.exit(1);
+ }
+
+ String url = args[0];
+ String sendQueue = args[1];
+ int messageCount = Integer.parseInt(args[2]);
+
+ InitialContext ctx = getInitialContext(url);
+
+ QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
+ QueueConnection qcon = qconFactory.createQueueConnection();
+ QueueSession qsession = qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue sendQ = (Queue) ctx.lookup(sendQueue);
+ Queue receiveQ = qsession.createTemporaryQueue();
+ QueueSender qsender = qsession.createSender(sendQ);
+ qsender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _log.debug("Queue sender created for service queue " + sendQ);
+
+ javax.jms.MessageConsumer messageConsumer = (javax.jms.MessageConsumer) qsession.createConsumer(receiveQ);
+
+ //TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
+ final long startTime = System.currentTimeMillis();
+
+ messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+ qcon.start();
+ for (int i = 0; i < messageCount; i++)
+ {
+ TextMessage msg = qsession.createTextMessage("/Presented to in conjunction with Mahnah Mahnah and the Snowths:" + i);
+ msg.setJMSReplyTo(receiveQ);
+ if (i%1000 == 0)
+ {
+ long timeNow = System.currentTimeMillis();
+ msg.setLongProperty("timeSent", timeNow);
+ }
+ qsender.send(msg);
+ }
+
+ new Thread("foo").start();
+ //qsession.close();
+ //qcon.close();
+ }
+
+ private static InitialContext getInitialContext(String url) throws NamingException
+ {
+ Hashtable env = new Hashtable();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
+ env.put(Context.PROVIDER_URL, url);
+ return new InitialContext(env);
+ }
+}