summaryrefslogtreecommitdiff
path: root/qpid/java/client/example
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/example')
-rw-r--r--qpid/java/client/example/build.xml28
-rw-r--r--qpid/java/client/example/src/main/java/README.txt33
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java105
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java74
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java52
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java83
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java335
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java148
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/hello.properties27
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java163
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java138
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java29
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java141
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java105
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java141
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java208
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java59
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java32
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java72
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java123
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java81
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java98
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java29
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java29
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java168
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java81
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java57
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties40
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java263
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java236
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java139
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java47
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java182
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java46
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java171
-rwxr-xr-xqpid/java/client/example/src/main/java/runSample.sh72
36 files changed, 3835 insertions, 0 deletions
diff --git a/qpid/java/client/example/build.xml b/qpid/java/client/example/build.xml
new file mode 100644
index 0000000000..8b0d59bd8a
--- /dev/null
+++ b/qpid/java/client/example/build.xml
@@ -0,0 +1,28 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="AMQ Client" default="build">
+
+ <property name="module.depends" value="client common"/>
+ <property name="module.test.depends" value=""/>
+
+ <import file="../../module.xml"/>
+
+</project>
diff --git a/qpid/java/client/example/src/main/java/README.txt b/qpid/java/client/example/src/main/java/README.txt
new file mode 100644
index 0000000000..757054e492
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/README.txt
@@ -0,0 +1,33 @@
+In order to use the runSample script, you are required to set two environment
+variables, QPID_HOME and QPID_SAMPLE. If not the default values will be used.
+
+QPID_HOME
+---------
+This is the directory that contains the QPID distribution. If you are running the Qpid
+Java broker on the same machine as the examples, you have already set QPID_HOME to this
+directory.
+
+default: /usr/share/java/
+
+QPID_SAMPLE
+-----------
+
+This is the examples directory, which is the parent directory of the
+'java' directory in which you find 'runSample.sh'
+
+(Ex:- $QPID_SRC_HOME/java/client/example/src/main)
+
+default: $PWD
+
+Note: you must have write privileges to this directory in order to run
+the examples.
+
+
+Running the Examples
+===========================
+
+To run these programs, do the following:
+
+ 1. Make sure that a Qpid broker is running.
+ 2. In the java directory, use runSample.sh to run the program:
+ $ ./runSample.sh <class name> <arguments> \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
new file mode 100644
index 0000000000..b43031ad23
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.slf4j.Logger;
+
+public class Drain extends OptionParser
+{
+
+ static final Option FOREVER = new Option("f",
+ "forever",
+ "ignore timeout and wait forever",
+ null,
+ null,
+ Boolean.class);
+
+ static final Option COUNT = new Option ("c",
+ "count",
+ "read c messages, then exit",
+ "COUNT",
+ "0",
+ Integer.class);
+
+
+ static
+ {
+ optDefs.add(BROKER);
+ optDefs.add(HELP);
+ optDefs.add(TIMEOUT);
+ optDefs.add(FOREVER);
+ optDefs.add(COUNT);
+ optDefs.add(CON_OPTIONS);
+ optDefs.add(BROKER_OPTIONS);
+ }
+
+ public Drain(String[] args, String usage, String desc) throws Exception
+ {
+ super(args, usage, desc);
+
+ Connection con = createConnection();
+ con.start();
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination dest = new AMQAnyDestination(address);
+ MessageConsumer consumer = ssn.createConsumer(dest);
+ Message msg;
+
+ long timeout = -1;
+ int count = 0;
+ int i = 0;
+
+ if (containsOp(TIMEOUT)) { timeout = Integer.parseInt(getOp(TIMEOUT))*1000; }
+ if (containsOp(FOREVER)) { timeout = 0; }
+ if (containsOp(COUNT)) { count = Integer.parseInt(getOp(COUNT)); }
+
+ while ((msg = consumer.receive(timeout)) != null)
+ {
+ System.out.println("\n------------- Msg -------------");
+ System.out.println(msg);
+ System.out.println("-------------------------------\n");
+
+ if (count > 0) {
+ if (++i == count) {
+ break;
+ }
+ }
+ }
+
+ ssn.close();
+ con.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String u = "Usage: drain [OPTIONS] 'ADDRESS'";
+ String d = "Drains messages from the specified address.";
+
+ new Drain(args,u,d);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
new file mode 100644
index 0000000000..949ee4dac6
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import java.util.Properties;
+
+
+public class Hello
+{
+
+ public Hello()
+ {
+ }
+
+ public static void main(String[] args)
+ {
+ Hello hello = new Hello();
+ hello.runTest();
+ }
+
+ private void runTest()
+ {
+ try {
+ Properties properties = new Properties();
+ properties.load(this.getClass().getResourceAsStream("hello.properties"));
+ Context context = new InitialContext(properties);
+
+ ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory");
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = (Destination) context.lookup("topicExchange");
+
+ MessageProducer messageProducer = session.createProducer(destination);
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ TextMessage message = session.createTextMessage("Hello world!");
+ messageProducer.send(message);
+
+ message = (TextMessage)messageConsumer.receive();
+ System.out.println(message.getText());
+
+ connection.close();
+ context.close();
+ }
+ catch (Exception exp)
+ {
+ exp.printStackTrace();
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java
new file mode 100644
index 0000000000..89db04f8d3
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
+
+public class MapReceiver {
+
+ public static void main(String[] args) throws Exception
+ {
+ Connection connection =
+ new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");
+
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MapMessage m = (MapMessage)consumer.receive();
+ System.out.println(m);
+ connection.close();
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java
new file mode 100644
index 0000000000..0ce9383add
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
+
+public class MapSender {
+
+ public static void main(String[] args) throws Exception
+ {
+ Connection connection =
+ new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
+ MessageProducer producer = session.createProducer(queue);
+
+ MapMessage m = session.createMapMessage();
+ m.setIntProperty("Id", 987654321);
+ m.setStringProperty("name", "Widget");
+ m.setDoubleProperty("price", 0.99);
+
+ List<String> colors = new ArrayList<String>();
+ colors.add("red");
+ colors.add("green");
+ colors.add("white");
+ m.setObject("colours", colors);
+
+ Map<String,Double> dimensions = new HashMap<String,Double>();
+ dimensions.put("length",10.2);
+ dimensions.put("width",5.1);
+ dimensions.put("depth",2.0);
+ m.setObject("dimensions",dimensions);
+
+ List<List<Integer>> parts = new ArrayList<List<Integer>>();
+ parts.add(Arrays.asList(new Integer[] {1,2,5}));
+ parts.add(Arrays.asList(new Integer[] {8,2,5}));
+ m.setObject("parts", parts);
+
+ Map<String,Object> specs = new HashMap<String,Object>();
+ specs.put("colours", colors);
+ specs.put("dimensions", dimensions);
+ specs.put("parts", parts);
+ m.setObject("specs",specs);
+
+ producer.send(m);
+ connection.close();
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java
new file mode 100644
index 0000000000..f4e17c5c4c
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+
+public class OptionParser
+{
+ static final Option BROKER = new Option("b",
+ "broker",
+ "connect to specified broker",
+ "USER:PASS@HOST:PORT",
+ "guest:guest@localhost:5672",
+ String.class);
+
+ static final Option HELP = new Option("h",
+ "help",
+ "show this help message and exit",
+ null,
+ null,
+ Boolean.class);
+
+ static final Option TIMEOUT = new Option("t",
+ "timeout",
+ "timeout in seconds to wait before exiting",
+ "TIMEOUT",
+ "0",
+ Integer.class);
+
+ static final Option CON_OPTIONS = new Option(null,
+ "con-option",
+ "JMS Connection URL options. Ex sync_ack=true sync_publish=all ",
+ "NAME=VALUE",
+ null,
+ String.class);
+
+
+ static final Option BROKER_OPTIONS = new Option(null,
+ "broker-option",
+ "JMS Broker URL options. Ex ssl=true sasl_mechs=GSSAPI ",
+ "NAME=VALUE",
+ null,
+ String.class);
+
+
+ protected Map<String,Object> optMap = new HashMap<String,Object>();
+ protected static final List<Option> optDefs = new ArrayList<Option>();
+
+ protected String usage;
+ protected String desc;
+ protected String address;
+
+ public OptionParser(String[] args, String usage, String desc)
+ {
+ this.usage = usage;
+ this.desc = desc;
+
+ if (args.length == 0 ||
+ (args.length == 1 && (args[0].equals("-h") || args[0].equals("--help"))))
+ {
+ printHelp();
+ }
+
+ address = args[args.length -1];
+ String[] ops = new String[args.length -1];
+ System.arraycopy(args, 0, ops, 0, ops.length);
+ parseOpts(ops);
+
+ System.out.println(optMap);
+
+ if (isHelp())
+ {
+ printHelp();
+ }
+ }
+
+ public boolean isHelp()
+ {
+ return optMap.containsKey("h") || optMap.containsKey("help");
+ }
+
+ public void printHelp()
+ {
+ System.out.println(String.format("%s\n",usage));
+ System.out.println(String.format("%s\n",desc));
+ System.out.println(String.format("%s\n","Options:"));
+
+ for (Option op : optDefs)
+ {
+ String valueLabel = op.getValueLabel() != null ? "=" + op.getValueLabel() : "";
+ String shortForm = op.getShortForm() != null ? "-" + op.getShortForm() + valueLabel : "";
+ String longForm = op.getLongForm() != null ? "--" + op.getLongForm() + valueLabel : "";
+ String desc = op.getDesc();
+ String defaultValue = op.getDefaultValue() != null ?
+ " (default " + op.getDefaultValue() + ")" : "";
+
+ if (!shortForm.equals(""))
+ {
+ longForm = shortForm + ", " + longForm;
+ }
+ System.out.println(
+ String.format("%-54s%s%s", longForm,desc,defaultValue));
+ }
+
+ System.exit(0);
+ }
+
+ private void parseOpts(String[] args)
+ {
+ String prevOpt = null;
+ for(String op: args)
+ {
+ // covers both -h and --help formats
+ if (op.startsWith("-"))
+ {
+ String key = op.substring(op.startsWith("--")? 2:1 ,
+ (op.indexOf('=') > 0) ?
+ op.indexOf('='):
+ op.length());
+
+ boolean match = false;
+ for (Option option: optDefs)
+ {
+
+ if ((op.startsWith("-") && option.shortForm != null && option.shortForm.equals(key)) ||
+ (op.startsWith("--") && option.longForm != null && option.longForm.equals(key)) )
+ {
+ match = true;
+ break;
+ }
+ }
+
+ if (!match)
+ {
+ System.out.println(op + " is not a valid option");
+ System.exit(0);
+ }
+
+ if (op.indexOf('=') > 0)
+ {
+ String val = extractValue(op.substring(op.indexOf('=')+1));
+ if (optMap.containsKey(key))
+ {
+ optMap.put(key, optMap.get(key) + "," + val);
+ }
+ else
+ {
+ optMap.put(key, val);
+ }
+ }
+ else
+ {
+ if (! optMap.containsKey(key)){ optMap.put(key, ""); }
+ prevOpt = key;
+ }
+ }
+ else if (prevOpt != null) // this is to catch broker localhost:5672 instead broker=localhost:5672
+ {
+ String val = extractValue(op);
+ if (optMap.containsKey(prevOpt) && !optMap.get(prevOpt).toString().equals(""))
+ {
+ optMap.put(prevOpt, optMap.get(prevOpt) + "," + val);
+ }
+ else
+ {
+ optMap.put(prevOpt, val);
+ }
+ prevOpt = null;
+ }
+ else
+ {
+ System.out.println(optMap);
+ throw new IllegalArgumentException(op + " is not a valid option");
+ }
+ }
+ }
+
+ private String extractValue(String op)
+ {
+ if (op.startsWith("'"))
+ {
+ if (!op.endsWith("'"))
+ throw new IllegalArgumentException(" The option " + op + " needs to be inside quotes");
+
+ return op.substring(1,op.length() -1);
+ }
+ else
+ {
+ return op;
+ }
+ }
+
+ protected boolean containsOp(Option op)
+ {
+ return optMap.containsKey(op.shortForm) || optMap.containsKey(op.longForm);
+ }
+
+ protected String getOp(Option op)
+ {
+ if (optMap.containsKey(op.shortForm))
+ {
+ return (String)optMap.get(op.shortForm);
+ }
+ else if (optMap.containsKey(op.longForm))
+ {
+ return (String)optMap.get(op.longForm);
+ }
+ else
+ {
+ return op.getDefaultValue();
+ }
+ }
+
+ protected Connection createConnection() throws Exception
+ {
+ StringBuffer buf;
+ buf = new StringBuffer();
+ buf.append("amqp://");
+ String userPass = "guest:guest";
+ String broker = "localhost:5672";
+ if(containsOp(BROKER))
+ {
+ try
+ {
+ String b = getOp(BROKER);
+ userPass = b.substring(0,b.indexOf('@'));
+ broker = b.substring(b.indexOf('@')+1);
+ }
+ catch (StringIndexOutOfBoundsException e)
+ {
+ Exception ex = new Exception("Error parsing broker string " + getOp(BROKER));
+ ex.initCause(e);
+ throw ex;
+ }
+
+ }
+
+ if(containsOp(BROKER_OPTIONS))
+ {
+ String bOps = getOp(BROKER_OPTIONS);
+ bOps = bOps.replaceAll(",", "'&");
+ bOps = bOps.replaceAll("=", "='");
+ broker = broker + "?" + bOps + "'";
+ }
+ buf.append(userPass);
+ buf.append("@test/test?brokerlist='tcp://");
+ buf.append(broker).append("'");
+ if(containsOp(CON_OPTIONS))
+ {
+ String bOps = getOp(CON_OPTIONS);
+ bOps = bOps.replaceAll(",", "'&");
+ bOps = bOps.replaceAll("=", "='");
+ buf.append("&").append(bOps).append("'");
+ }
+
+ Connection con = new AMQConnection(buf.toString());
+ return con;
+ }
+
+ static class Option
+ {
+ private String shortForm;
+ private String longForm;
+ private String desc;
+ private String valueLabel;
+ private String defaultValue;
+ private Class type;
+
+ public Option(String shortForm, String longForm, String desc,
+ String valueLabel, String defaultValue, Class type)
+ {
+ this.shortForm = shortForm;
+ this.longForm = longForm;
+ this.defaultValue = defaultValue;
+ this.type = type;
+ this.desc = desc;
+ this.valueLabel = valueLabel;
+ }
+
+ public String getShortForm()
+ {
+ return shortForm;
+ }
+
+ public String getLongForm()
+ {
+ return longForm;
+ }
+
+ public String getDefaultValue()
+ {
+ return defaultValue;
+ }
+
+ public Class getType()
+ {
+ return type;
+ }
+
+ public String getDesc()
+ {
+ return desc;
+ }
+
+ public String getValueLabel()
+ {
+ return valueLabel;
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
new file mode 100644
index 0000000000..5da319a658
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQAnyDestination;
+
+public class Spout extends OptionParser
+{
+
+ static final Option COUNT = new Option("c",
+ "count",
+ "stop after count messages have been sent, zero disables",
+ "COUNT",
+ "1",
+ Integer.class);
+
+ static final Option ID = new Option("i",
+ "id",
+ "use the supplied id instead of generating one",
+ null,
+ null,
+ Boolean.class);
+
+ static final Option CONTENT = new Option(null,
+ "content",
+ "specify textual content",
+ "TEXT",
+ null,
+ Boolean.class);
+
+ static final Option MSG_PROPERTY = new Option("P",
+ "property",
+ "specify message property",
+ "NAME=VALUE",
+ null,
+ Boolean.class);
+
+ static final Option MAP_ENTRY = new Option("M",
+ "map",
+ "specify entry for map content",
+ "KEY=VALUE",
+ null,
+ Boolean.class);
+
+ static
+ {
+ optDefs.add(BROKER);
+ optDefs.add(HELP);
+ optDefs.add(TIMEOUT);
+ optDefs.add(COUNT);
+ optDefs.add(MSG_PROPERTY);
+ optDefs.add(MAP_ENTRY);
+ optDefs.add(CONTENT);
+ optDefs.add(CON_OPTIONS);
+ optDefs.add(BROKER_OPTIONS);
+ }
+
+ public Spout(String[] args, String usage, String desc) throws Exception
+ {
+ super(args, usage, desc);
+
+ Connection con = createConnection();
+ con.start();
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination dest = new AMQAnyDestination(address);
+ MessageProducer producer = ssn.createProducer(dest);
+
+ int count = Integer.parseInt(getOp(COUNT));
+
+ for (int i=0; i < count; i++)
+ {
+ Message msg = createMessage(ssn);
+ producer.send(msg);
+ System.out.println("\n------------- Msg -------------");
+ System.out.println(msg);
+ System.out.println("-------------------------------\n");
+ }
+ ssn.close();
+ con.close();
+ }
+
+ private Message createMessage(Session ssn) throws Exception
+ {
+ if (containsOp(MAP_ENTRY))
+ {
+ MapMessage msg = ssn.createMapMessage();
+ for (String pair: getOp(MAP_ENTRY).split(","))
+ {
+ msg.setString(pair.substring(0, pair.indexOf('=')),
+ pair.substring(pair.indexOf('=') + 1));
+ }
+ setProperties(msg);
+ return msg;
+ }
+ else
+ {
+ Message msg =
+ ssn.createTextMessage(containsOp(CONTENT) ? getOp(CONTENT) : "");
+ setProperties(msg);
+ return msg;
+ }
+ }
+
+ private void setProperties(Message m) throws Exception
+ {
+ if(containsOp(MSG_PROPERTY))
+ {
+ for (String pair: getOp(MSG_PROPERTY).split(","))
+ {
+ m.setStringProperty(pair.substring(0, pair.indexOf('=')),
+ pair.substring(pair.indexOf('=') + 1));
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String u = "Usage: spout [OPTIONS] 'ADDRESS'";
+ String d = "Send messages to the specified address.";
+
+ new Spout(args,u,d);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/hello.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/hello.properties
new file mode 100644
index 0000000000..27ea66b318
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/hello.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'
+
+# Register an AMQP destination in JNDI
+# destination.[jniName] = [Address Format]
+destination.topicExchange = amq.topic
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
new file mode 100644
index 0000000000..1849f733e9
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+import java.io.File;
+
+import javax.jms.JMSException;
+
+
+import org.apache.qpid.example.shared.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Class that sends message files to the Publisher to distribute
+ * using files as input
+ * Must set properties for host in properties file or uses in vm broker
+ */
+public class FileMessageDispatcher
+{
+
+ protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class);
+
+ protected static Publisher _publisher = null;
+
+ /**
+ * To use this main method you need to specify a path or file to use for input
+ * This class then uses file contents from the dir/file specified to generate
+ * messages to publish
+ * Intended to be a very simple way to get going with publishing using the broker
+ * @param args - must specify one value, the path to file(s) for publisher
+ */
+ public static void main(String[] args)
+ {
+
+ // Check command line args ok - must provide a path or file for us to dispatch
+ if (args.length == 0)
+ {
+ System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + "");
+ }
+ else
+ {
+ try
+ {
+ // publish message(s) from file(s) to configured queue
+ publish(args[0]);
+
+ // Move payload file(s) to archive location as no error
+ FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH));
+ }
+ catch (Exception e)
+ {
+ // log error and exit
+ _logger.error("Error trying to dispatch message: " + e);
+ System.exit(1);
+ }
+ finally
+ {
+ // clean up before exiting
+ if (getPublisher() != null)
+ {
+ getPublisher().cleanup();
+ }
+ }
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Finished dispatching message");
+ }
+
+ System.exit(0);
+ }
+
+ /**
+ * Publish the content of a file or files from a directory as messages
+ * @param path - from main args
+ * @throws JMSException
+ * @throws MessageFactoryException - if cannot create message from file content
+ */
+ public static void publish(String path) throws JMSException, MessageFactoryException
+ {
+ File tempFile = new File(path);
+ if (tempFile.isDirectory())
+ {
+ // while more files in dir publish them
+ File[] files = tempFile.listFiles();
+
+ if ((files == null) || (files.length == 0))
+ {
+ _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile);
+ }
+ else
+ {
+ for (File file : files)
+ {
+ // Create message factory passing in payload path
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString());
+
+ // Send the message generated from the payload using the _publisher
+ getPublisher().sendMessage(factory.createEventMessage());
+
+ }
+ }
+ }
+ else
+ {
+ // handle a single file
+ // Create message factory passing in payload path
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString());
+
+ // Send the message generated from the payload using the _publisher
+ getPublisher().sendMessage(factory.createEventMessage());
+ }
+ }
+
+ /**
+ * Cleanup before exit
+ */
+ public static void cleanup()
+ {
+ if (getPublisher() != null)
+ {
+ getPublisher().cleanup();
+ }
+ }
+
+ /**
+ * @return A Publisher instance
+ */
+ private static Publisher getPublisher()
+ {
+ if (_publisher != null)
+ {
+ return _publisher;
+ }
+
+ // Create a _publisher
+ _publisher = new Publisher();
+
+ return _publisher;
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
new file mode 100644
index 0000000000..04339b2498
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+
+import java.io.*;
+import javax.jms.*;
+
+public class FileMessageFactory
+{
+ protected final Session _session;
+ protected final String _payload;
+ protected final String _filename;
+
+ /**
+ * Contructs and instance using a filename from which content will be used to create message
+ * @param session
+ * @param filename
+ * @throws MessageFactoryException
+ */
+ public FileMessageFactory(Session session, String filename) throws MessageFactoryException
+ {
+ try
+ {
+ _filename = filename;
+ _payload = FileUtils.readFileAsString(filename);
+ _session = session;
+ }
+ catch (Exception e)
+ {
+ MessageFactoryException mfe = new MessageFactoryException(e.toString(), e);
+ throw mfe;
+ }
+ }
+
+ /**
+ * Creates a text message and sets filename property on it
+ * The filename property is purely intended to provide visibility
+ * of file content passing trhough the broker using example classes
+ * @return Message - a TextMessage with content from file
+ * @throws JMSException
+ */
+ public Message createEventMessage() throws JMSException
+ {
+ TextMessage msg = _session.createTextMessage();
+ msg.setText(_payload);
+ msg.setStringProperty(Statics.FILENAME_PROPERTY, new File(_filename).getName());
+
+ return msg;
+ }
+
+ /**
+ * Creates message from a string for use by the monitor
+ * @param session
+ * @param textMsg - message content
+ * @return Message - TextMessage with content from String
+ * @throws JMSException
+ */
+ public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException
+ {
+ TextMessage msg = session.createTextMessage();
+ msg.setText(textMsg);
+
+ return msg;
+ }
+
+ public Message createShutdownMessage() throws JMSException
+ {
+ return _session.createTextMessage("SHUTDOWN");
+ }
+
+ public Message createReportRequestMessage() throws JMSException
+ {
+ return _session.createTextMessage("REPORT");
+ }
+
+ public Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ public boolean isShutdown(Message m)
+ {
+ return checkText(m, "SHUTDOWN");
+ }
+
+ public boolean isReport(Message m)
+ {
+ return checkText(m, "REPORT");
+ }
+
+ public Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return e.toString();
+ }
+ }
+
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return false;
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
new file mode 100644
index 0000000000..d709da6432
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+public class MessageFactoryException extends Exception
+{
+ public MessageFactoryException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
new file mode 100644
index 0000000000..3d16e01af4
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+
+/**
+ * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds
+ * apart) heartbeat message
+ */
+public class MonitorMessageDispatcher
+{
+
+ private static final Logger _logger = LoggerFactory.getLogger(MonitorMessageDispatcher.class);
+
+ protected static MonitorPublisher _monitorPublisher = null;
+
+ protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher";
+
+ /**
+ * Easy entry point for running a message dispatcher for monitoring consumption
+ * Sends 1000 messages with no delay
+ *
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ //Switch on logging appropriately for your app
+ try
+ {
+ int i =0;
+ while (i < 1000)
+ {
+ try
+ {
+ //endlessly publish messages to monitor queue
+ publish();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dispatched monitor message");
+ }
+
+ //sleep for twenty seconds and then publish again - change if appropriate
+ //Thread.sleep(1000);
+ i++ ;
+ }
+ catch (UndeliveredMessageException a)
+ {
+ //trigger application specific failure handling here
+ _logger.error("Problem delivering monitor message");
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error trying to dispatch AMS monitor message: " + e);
+ System.exit(1);
+ }
+ finally
+ {
+ if (getMonitorPublisher() != null)
+ {
+ getMonitorPublisher().cleanup();
+ }
+ }
+
+ System.exit(1);
+ }
+
+ /**
+ * Publish heartbeat message
+ *
+ * @throws JMSException
+ * @throws UndeliveredMessageException
+ */
+ public static void publish() throws JMSException, UndeliveredMessageException
+ {
+ //Send the message generated from the payload using the _publisher
+// getMonitorPublisher().sendImmediateMessage
+// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+
+ getMonitorPublisher().sendMessage
+ (getMonitorPublisher()._session,
+ FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()),
+ DeliveryMode.PERSISTENT, false, true);
+
+ }
+
+ /** Cleanup publishers */
+ public static void cleanup()
+ {
+ if (getMonitorPublisher() != null)
+ {
+ getMonitorPublisher().cleanup();
+ }
+
+ if (getMonitorPublisher() != null)
+ {
+ getMonitorPublisher().cleanup();
+ }
+ }
+
+ //Returns a _publisher for the monitor queue
+ private static MonitorPublisher getMonitorPublisher()
+ {
+ if (_monitorPublisher != null)
+ {
+ return _monitorPublisher;
+ }
+
+ //Create a _publisher using failover details and constant for monitor queue
+ _monitorPublisher = new MonitorPublisher();
+
+ _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
+ return _monitorPublisher;
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
new file mode 100644
index 0000000000..750f57d9dc
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.client.BasicMessageProducer;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+/**
+ * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via
+ * JMS MessageProducer
+ */
+public class MonitorPublisher extends Publisher
+{
+
+ private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
+
+ BasicMessageProducer _producer;
+
+ public MonitorPublisher()
+ {
+ super();
+ }
+
+ /*
+ * Publishes a message using given details
+ */
+ public boolean sendMessage(Session session, Message message, int deliveryMode,
+ boolean immediate, boolean commit) throws UndeliveredMessageException
+ {
+ try
+ {
+ _producer = (BasicMessageProducer) session.createProducer(_destination);
+
+ _producer.send(message, deliveryMode, immediate);
+
+ if (commit)
+ {
+ //commit the message send and close the transaction
+ _session.commit();
+ }
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed but do not rollback here as channel closed
+ _log.error("JMSException", e);
+ e.printStackTrace();
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
+ }
+
+ _log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+
+ /*
+ * Publishes a non-persistent message using transacted session
+ */
+ public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException
+ {
+ try
+ {
+ _producer = (BasicMessageProducer) _session.createProducer(_destination);
+
+ //Send message via our producer which is not persistent and is immediate
+ //NB: not available via jms interface MessageProducer
+ _producer.send(message, DeliveryMode.NON_PERSISTENT, true);
+
+ //commit the message send and close the transaction
+ _session.commit();
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed but do not rollback here as channel closed
+ _log.error("JMSException", e);
+ e.printStackTrace();
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
+ }
+
+ _log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java
new file mode 100644
index 0000000000..a92efe99ac
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+import java.io.File;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+
+import org.apache.qpid.example.shared.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Class that sends parameterised number of message files to the Publisher
+ * Must set properties for host in properties file or uses in vm broker
+ */
+public class MultiMessageDispatcher
+{
+
+ protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class);
+
+ protected static Publisher _publisher = null;
+
+ /**
+ * To use this main method you need to specify a path or file to use for input
+ * This class then uses file contents from the dir/file specified to generate
+ * messages to publish
+ * Intended to be a very simple way to get going with publishing using the broker
+ * @param args - must specify one value, the path to file(s) for publisher
+ */
+ public static void main(String[] args)
+ {
+
+ // Check command line args ok - must provide a path or file for us to dispatch
+ if (args.length < 2)
+ {
+ System.out.println("Usage: MultiMessageDispatcher <numberOfMessagesToSend> <topic(true|false)>" + "");
+ }
+ else
+ {
+ boolean topicPublisher = true;
+
+ try
+ {
+ // publish message(s)
+ topicPublisher = new Boolean(args[1]).booleanValue();
+ publish(new Integer(args[0]).intValue(),topicPublisher);
+
+ // Move payload file(s) to archive location as no error
+ FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH));
+ }
+ catch (Exception e)
+ {
+ // log error and exit
+ _logger.error("Error trying to dispatch message: " + e);
+ System.exit(1);
+ }
+ finally
+ {
+
+ cleanup(topicPublisher);
+ }
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Finished dispatching message");
+ }
+
+ System.exit(0);
+ }
+
+ /**
+ * Publish the content of a file or files from a directory as messages
+ * @param numMessages - from main args
+ * @throws javax.jms.JMSException
+ * @throws org.apache.qpid.example.publisher.MessageFactoryException - if cannot create message from file content
+ */
+ public static void publish(int numMessages, boolean topicPublisher) throws JMSException, MessageFactoryException
+ {
+ {
+ // Send the message generated from the payload using the _publisher
+ getPublisher(topicPublisher).sendMessage(numMessages);
+ }
+ }
+
+ /**
+ * Cleanup before exit
+ */
+ public static void cleanup(boolean topicPublisher)
+ {
+ if (getPublisher(topicPublisher) != null)
+ {
+ getPublisher(topicPublisher).cleanup();
+ }
+ }
+
+ /**
+ * @return A Publisher instance
+ */
+ private static Publisher getPublisher(boolean topic)
+ {
+ if (_publisher != null)
+ {
+ return _publisher;
+ }
+
+ if (!topic)
+ {
+ // Create a _publisher
+ _publisher = new Publisher();
+ }
+ else
+ {
+ _publisher = new TopicPublisher();
+ }
+ return _publisher;
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
new file mode 100644
index 0000000000..b5f44557a4
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import javax.jms.*;
+
+import javax.naming.InitialContext;
+
+import org.apache.qpid.example.shared.InitialContextHelper;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+public class Publisher
+{
+ private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
+
+ protected InitialContextHelper _contextHelper;
+
+ protected Connection _connection;
+
+ protected Session _session;
+
+ protected MessageProducer _producer;
+
+ protected String _destinationDir;
+
+ protected String _name = "Publisher";
+
+ protected Destination _destination;
+
+ protected static final String _defaultDestinationDir = "/tmp";
+
+ /**
+ * Creates a Publisher instance using properties from example.properties
+ * See InitialContextHelper for details of how context etc created
+ */
+ public Publisher()
+ {
+ try
+ {
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //then create a connection using the AMQConnectionFactory
+ AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
+ _connection = cf.createConnection();
+
+ _connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println("ExceptionListener caught: " + jmse);
+ //System.exit(0);
+ }
+ });
+
+ //create a transactional session
+ _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //lookup the example queue and use it
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ _destination = (Queue) ctx.lookup("MyQueue");
+
+ //create a message producer
+ _producer = _session.createProducer(_destination);
+
+ //set destination dir for files that have been processed
+ _destinationDir = _defaultDestinationDir;
+
+ _connection.start();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _log.error("Exception", e);
+ }
+ }
+
+ /**
+ * Creates and sends the number of messages specified in the param
+ */
+ public void sendMessage(int numMessages)
+ {
+ try
+ {
+ TextMessage txtMessage = _session.createTextMessage("msg");
+ for (int i=0;i<numMessages;i++)
+ {
+ sendMessage(txtMessage);
+ _log.info("Sent: " + i);
+ }
+ }
+ catch (JMSException j)
+ {
+ _log.error("Exception in sendMessage" + j);
+ }
+
+
+ }
+
+ /**
+ * Publishes a non-persistent message using transacted session
+ * Note that persistent is the default mode for send - so need to specify for transient
+ */
+ public boolean sendMessage(Message message)
+ {
+ try
+ {
+ //Send message via our producer which is not persistent
+ _producer.send(message, DeliveryMode.PERSISTENT, _producer.getPriority(), _producer.getTimeToLive());
+
+ //commit the message send and close the transaction
+ _session.commit();
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed and rollback here
+ try
+ {
+ _session.rollback();
+ _log.error("JMSException", e);
+ e.printStackTrace();
+ return false;
+ }
+ catch (JMSException j)
+ {
+ _log.error("Unable to rollback publish transaction ",e);
+ return false;
+ }
+ }
+
+ //_log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+
+ /**
+ * Cleanup resources before exit
+ */
+ public void cleanup()
+ {
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ _connection.close();
+ }
+ _connection = null;
+ _producer = null;
+ }
+ catch(Exception e)
+ {
+ _log.error("Error trying to cleanup publisher " + e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Exposes session
+ * @return Session
+ */
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public String getDestinationDir()
+ {
+ return _destinationDir;
+ }
+
+ public void setDestinationDir(String destinationDir)
+ {
+ _destinationDir = destinationDir;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void setName(String _name) {
+ this._name = _name;
+ }
+}
+
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
new file mode 100644
index 0000000000..8645e41101
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.example.shared.InitialContextHelper;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+
+/**
+ * Subclass of Publisher which sends messages to a topic destination defined in example.properties
+ */
+public class TopicPublisher extends Publisher
+{
+
+ private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
+
+ public TopicPublisher()
+ {
+ super();
+
+ try
+ {
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //lookup the example topic and use it
+ _destination = (Topic) ctx.lookup("MyTopic");
+
+ //create a message producer
+ _producer = _session.createProducer(_destination);
+ }
+ catch (Exception e)
+ {
+ //argh
+ _log.error("Exception trying to construct TopicPublisher" + e);
+ }
+
+ }
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
new file mode 100644
index 0000000000..245008b68a
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+/**
+ * Exception thrown by monitor when cannot send a message marked for immediate delivery
+ */
+public class UndeliveredMessageException extends Exception
+{
+ public UndeliveredMessageException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
new file mode 100644
index 0000000000..e32ee0ba73
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+/**
+ * An abstract base class that wraps up the creation of a JMS client utilising JNDI
+ */
+public abstract class Client
+{
+ protected ConnectionSetup _setup;
+
+ protected Connection _connection;
+ protected Destination _destination;
+ protected Session _session;
+
+ public Client(String destination)
+ {
+ if (destination == null)
+ {
+ destination = ConnectionSetup.TOPIC_JNDI_NAME;
+ }
+
+ try
+ {
+ _setup = new ConnectionSetup();
+ }
+ catch (NamingException e)
+ {
+ //ignore
+ }
+
+ if (_setup != null)
+ {
+ try
+ {
+ _connection = _setup.getConnectionFactory().createConnection();
+ _destination = _setup.getDestination(destination);
+ }
+ catch (JMSException e)
+ {
+ System.err.println(e.getMessage());
+ }
+ }
+ }
+
+ public abstract void start();
+
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
new file mode 100644
index 0000000000..88ee9ed2c5
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+
+/**
+ * This ConnectionSetup is a wrapper around JNDI it creates a number of entries.
+ *
+ * It is equivalent to a PropertyFile of value:
+ *
+ * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost'
+ * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1'
+ *
+ * queue.queue=example.MyQueue
+ * topic.topic=example.hierarical.topic
+ *
+ */
+public class ConnectionSetup
+{
+ final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ final static String CONNECTION_JNDI_NAME = "local";
+ final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'";
+
+ public static final String QUEUE_JNDI_NAME = "queue";
+ final static String QUEUE_NAME = "example.MyQueue";
+
+ public static final String TOPIC_JNDI_NAME = "topic";
+ final static String TOPIC_NAME = "usa.news";
+
+ private Context _ctx;
+
+ public ConnectionSetup() throws NamingException
+ {
+
+ // Set the properties ...
+ Properties properties = new Properties();
+ properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
+ properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
+ properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'");
+
+ properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME);
+ properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME);
+ // Create the initial context
+ _ctx = new InitialContext(properties);
+
+ }
+
+ public ConnectionSetup(Properties properties) throws NamingException
+ {
+ _ctx = new InitialContext(properties);
+ }
+
+ public ConnectionFactory getConnectionFactory()
+ {
+
+ // Perform the lookups
+ try
+ {
+ return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME);
+ }
+ catch (NamingException e)
+ {
+ //ignore
+ }
+ return null;
+ }
+
+ public Destination getDestination(String jndiName)
+ {
+ // Perform the lookups
+ try
+ {
+ return (Destination) _ctx.lookup(jndiName);
+ }
+ catch (ClassCastException cce)
+ {
+ //ignore
+ }
+ catch (NamingException ne)
+ {
+ //ignore
+ }
+ return null;
+ }
+
+
+ public void close()
+ {
+ try
+ {
+ _ctx.close();
+ }
+ catch (NamingException e)
+ {
+ //ignore
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
new file mode 100644
index 0000000000..ac3829d49e
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * A simple Publisher example.
+ *
+ * The class can take two arguments.
+ * java Publisher <destination> <msgCount>
+ * Where:
+ * destination is either 'topic' or 'queue' (Default: topic)
+ * msgCount is the number of messages to send (Default : 100)
+ *
+ */
+public class Publisher extends Client
+{
+ int _msgCount;
+
+ public Publisher(String destination, int msgCount)
+ {
+ super(destination);
+ _msgCount = msgCount;
+ }
+
+ public void start()
+ {
+ try
+ {
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer _producer = _session.createProducer(_destination);
+
+ for (int msgCount = 0; msgCount < _msgCount; msgCount++)
+ {
+ _producer.send(_session.createTextMessage("msg:" + msgCount));
+ System.out.println("Sent:" + msgCount);
+ }
+
+ System.out.println("Done.");
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+
+ String destination = args.length > 2 ? args[1] : "usa.news";
+
+ int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
+
+ new Publisher(destination, msgCount).start();
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
new file mode 100644
index 0000000000..f2d736701f
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.example.pubsub;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.util.concurrent.CountDownLatch;
+
+
+/**
+ * Simple client that listens for the specified number of msgs on the given Destinaton
+ *
+ * The class can take two arguments.
+ * java Subscriber <destination> <msgCount>
+ * Where:
+ * destination is either 'topic' or 'queue' (Default: topic)
+ * msgCount is the number of messages to send (Default : 100)
+ */
+public class Subscriber extends Client implements MessageListener
+{
+
+ CountDownLatch _count;
+
+ public Subscriber(String destination, int msgCount)
+ {
+ super(destination);
+ _count = new CountDownLatch(msgCount);
+ }
+
+
+ public void start()
+ {
+ try
+ {
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME),
+ "exampleClient").setMessageListener(this);
+ _connection.start();
+ _count.await();
+
+ System.out.println("Done");
+
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ String destination = args.length > 2 ? args[1] : null;
+ int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
+
+ new Subscriber(destination, msgCount).start();
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _count.countDown();
+ System.out.println("Received msg:" + ((TextMessage) message).getText());
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
new file mode 100644
index 0000000000..1a3d596a24
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.shared;
+
+public class ConnectionException extends Exception
+{
+ public ConnectionException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
new file mode 100644
index 0000000000..2987a9559b
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.shared;
+
+public class ContextException extends Exception
+{
+ public ContextException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
new file mode 100644
index 0000000000..54446cb6a7
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.shared;
+
+import java.io.*;
+
+/**
+ * Class that provides file related utility methods for utility use
+ */
+public class FileUtils {
+
+
+ //Reads file content into String
+ public static String getFileContent(String filePath) throws IOException
+ {
+
+ BufferedReader reader = null;
+ String tempData = "";
+ String eol = "\n\r";
+
+ try
+ {
+ String line;
+ reader = new BufferedReader(new FileReader(filePath));
+ while ((line = reader.readLine()) != null)
+ {
+ if (!tempData.equals(""))
+ {
+ tempData = tempData + eol + line;
+ }
+ else
+ {
+ tempData = line;
+ }
+ }
+ }
+ finally
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ }
+ return tempData;
+ }
+
+ /*
+ * Reads xml from a file and returns it as an array of chars
+ */
+ public static char[] getFileAsCharArray(String filePath) throws IOException
+ {
+ BufferedReader reader = null;
+ char[] tempChars = null;
+ String tempData = "";
+
+ try
+ {
+ String line;
+ reader = new BufferedReader(new FileReader(filePath));
+ while ((line = reader.readLine()) != null)
+ {
+ tempData = tempData + line;
+ }
+ tempChars = tempData.toCharArray();
+ }
+ finally
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ }
+ return tempChars;
+ }
+
+ /*
+ * Write String content to filename provided
+ */
+ public static void writeStringToFile(String content, String path) throws IOException
+ {
+
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path)));
+ writer.write(content);
+ writer.flush();
+ writer.close();
+ }
+
+ /*
+ * Allows moving of files to a new dir and preserves the last bit of the name only
+ */
+ public static void moveFileToNewDir(String path, String newDir) throws IOException
+ {
+ //get file name from current path
+ //while more files in dir publish them
+ File pathFile = new File(path);
+ if (pathFile.isDirectory())
+ {
+ File[] files = pathFile.listFiles();
+ for (File file : files)
+ {
+ moveFileToNewDir(file,newDir);
+ }
+ }
+ }
+
+ /*
+ * Allows moving of a file to a new dir and preserves the last bit of the name only
+ */
+ public static void moveFileToNewDir(File fileToMove, String newDir) throws IOException
+ {
+ moveFile(fileToMove,getArchiveFileName(fileToMove,newDir));
+ }
+
+ /*
+ * Moves file from a given path to a new path with String params
+ */
+ public static void moveFile(String fromPath, String dest) throws IOException
+ {
+ moveFile(new File(fromPath),new File(dest));
+ }
+
+ /*
+ * Moves file from a given path to a new path with mixed params
+ */
+ public static void moveFile(File fileToMove, String dest) throws IOException
+ {
+ moveFile(fileToMove,new File(dest));
+ }
+
+ /*
+ * Moves file from a given path to a new path with File params
+ */
+ public static void moveFile(File fileToMove, File dest) throws IOException
+ {
+ fileToMove.renameTo(dest);
+ }
+
+ /*
+ * Deletes a given file
+ */
+ public static void deleteFile(String filePath) throws IOException
+ {
+ new File(filePath).delete();
+ }
+
+ private static String getArchiveFileName(File fileToMove, String archiveDir)
+ {
+ //get file name from current path
+ String fileName = fileToMove.getName();
+ return archiveDir + File.separator + fileName;
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
new file mode 100644
index 0000000000..16a185133a
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.shared;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Class that provides helper methods for JNDI
+ */
+public class InitialContextHelper
+{
+
+ public static final String _defaultPropertiesName = "example.properties";
+ protected Properties _fileProperties;
+ protected InitialContext _initialContext;
+ protected static final Logger _log = LoggerFactory.getLogger(InitialContextHelper.class);
+
+ public InitialContextHelper(String propertiesName) throws ContextException
+ {
+ try
+ {
+ if ((propertiesName == null) || (propertiesName.length() == 0))
+ {
+ propertiesName = _defaultPropertiesName;
+ }
+
+ _fileProperties = new Properties();
+ ClassLoader cl = this.getClass().getClassLoader();
+
+ // NB: Need to change path to reflect package if moving classes around !
+ InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName);
+ _fileProperties.load(is);
+ _initialContext = new InitialContext(_fileProperties);
+ }
+ catch (IOException e)
+ {
+ throw new ContextException(e.toString(), e);
+ }
+ catch (NamingException n)
+ {
+ throw new ContextException(n.toString(), n);
+ }
+ }
+
+ public Properties getFileProperties()
+ {
+ return _fileProperties;
+ }
+
+ public InitialContext getInitialContext()
+ {
+ return _initialContext;
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
new file mode 100644
index 0000000000..c056f8a7da
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.shared;
+
+/**
+ * Constants used by AMS Publisher/Subscriber classes
+ */
+public class Statics {
+
+ public static final String TOPIC_NAME = "EXAMPLE_TOPIC";
+
+ public static final String QUEUE_NAME = "EXAMPLE_QUEUE";
+
+ public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR";
+
+ public static final String HOST_PROPERTY = "host";
+
+ public static final String PORT_PROPERTY = "port";
+
+ public static final String USER_PROPERTY = "user";
+
+ public static final String PWD_PROPERTY = "pwd";
+
+ public static final String TOPIC_PROPERTY = "topic";
+
+ public static final String QUEUE_PROPERTY = "queue";
+
+ public static final String VIRTUAL_PATH_PROPERTY = "virtualpath";
+
+ public static final String ARCHIVE_PATH = "archivepath";
+
+ public static final String CLIENT_PROPERTY = "client";
+
+ public static final String FILENAME_PROPERTY = "filename";
+
+ public static final String DEFAULT_USER = "guest";
+
+ public static final String DEFAULT_PWD = "guest";
+
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
new file mode 100644
index 0000000000..c76acbd8b9
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'
+
+# register some queues in JNDI using the form
+# queue.[jndiName] = [physicalName]
+queue.MyQueue = example.MyQueue
+
+# register some topics in JNDI using the form
+# topic.[jndiName] = [physicalName]
+topic.ibmStocks = stocks.nyse.ibm
+topic.MyTopic = example.MyTopic
+
+# Register an AMQP destination in JNDI
+# NOTE: Qpid currently only supports direct,topics and headers
+# destination.[jniName] = [BindingURL]
+destination.direct = direct://amq.direct//directQueue
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
new file mode 100644
index 0000000000..8a0ff88448
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example.simple.reqresp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+public class Client implements MessageListener
+{
+ final String BROKER = "localhost";
+
+ final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ final String CONNECTION_JNDI_NAME = "local";
+ final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'";
+
+ final String QUEUE_JNDI_NAME = "queue";
+ final String QUEUE_NAME = "example.RequestQueue";
+
+
+ private InitialContext _ctx;
+
+ private CountDownLatch _shutdownHook = new CountDownLatch(1);
+
+ public Client()
+ {
+ setupJNDI();
+
+ Connection connection;
+ Session session;
+ Destination responseQueue;
+
+ //Setup the connection. Create producer to sent message and consumer to receive the repsonse.
+ MessageProducer _producer;
+ try
+ {
+ connection = ((ConnectionFactory) lookupJNDI(CONNECTION_JNDI_NAME)).createConnection();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination requestQueue = (Queue) lookupJNDI(QUEUE_JNDI_NAME);
+
+ closeJNDI();
+
+ //Setup a message _producer to send message to the queue the server is consuming from
+ _producer = session.createProducer(requestQueue);
+
+ //Create a temporary queue that this client will listen for responses on then create a consumer
+ //that consumes message from this temporary queue.
+ responseQueue = session.createTemporaryQueue();
+
+ MessageConsumer responseConsumer = session.createConsumer(responseQueue);
+
+ //Set a listener to asynchronously deal with responses.
+ responseConsumer.setMessageListener(this);
+
+ // Now the connection is setup up start it.
+ connection.start();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Unable to setup connection, client and producer on broker");
+ return;
+ }
+
+ // Setup the message to send
+ TextMessage txtMessage;
+ try
+ {
+ //Now create the actual message you want to send
+ txtMessage = session.createTextMessage("Request Process");
+
+ //Set the reply to field to the temp queue you created above, this is the queue the server will respond to
+ txtMessage.setJMSReplyTo(responseQueue);
+
+ //Set a correlation ID so when you get a response you know which sent message the response is for
+ //If there is never more than one outstanding message to the server then the
+ //same correlation ID can be used for all the messages...if there is more than one outstanding
+ //message to the server you would presumably want to associate the correlation ID with this message
+
+ txtMessage.setJMSCorrelationID(txtMessage.getJMSMessageID());
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Unable to create message");
+ return;
+
+ }
+
+ try
+ {
+ _producer.send(txtMessage);
+ }
+ catch (JMSException e)
+ {
+ //Handle the exception appropriately
+ }
+
+ try
+ {
+ System.out.println("Sent Request Message ID :" + txtMessage.getJMSMessageID());
+ }
+ catch (JMSException e)
+ {
+ //Handle exception more appropriately.
+ }
+
+ //Wait for the return message to arrive
+ try
+ {
+ _shutdownHook.await();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this as we are quitting anyway.
+ }
+
+ //Close the connection
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("A problem occured while shutting down the connection : " + e);
+ }
+ }
+
+
+ /**
+ * Implementation of the Message Listener interface.
+ * This is where message will be asynchronously delivered.
+ *
+ * @param message
+ */
+ public void onMessage(Message message)
+ {
+ String messageText;
+ try
+ {
+ if (message instanceof TextMessage)
+ {
+ TextMessage textMessage = (TextMessage) message;
+ messageText = textMessage.getText();
+ System.out.println("messageText = " + messageText);
+ System.out.println("Correlation ID " + message.getJMSCorrelationID());
+
+ _shutdownHook.countDown();
+ }
+ else
+ {
+ System.err.println("Unexpected message delivered");
+ }
+ }
+ catch (JMSException e)
+ {
+ //Handle the exception appropriately
+ }
+ }
+
+ /**
+ * Lookup the specified name in the JNDI Context.
+ *
+ * @param name The string name of the object to lookup
+ *
+ * @return The object or null if nothing exists for specified name
+ */
+ private Object lookupJNDI(String name)
+ {
+ try
+ {
+ return _ctx.lookup(name);
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Error looking up '" + name + "' in JNDI Context:" + e);
+ }
+
+ return null;
+ }
+
+ /**
+ * Setup the JNDI context.
+ *
+ * In this case we are simply using a Properties object to store the pairing information.
+ *
+ * Further details can be found on the wiki site here:
+ *
+ * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html
+ */
+ private void setupJNDI()
+ {
+ // Set the properties ...
+ Properties properties = new Properties();
+ properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
+ properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
+ properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME);
+
+ // Create the initial context
+ Context ctx = null;
+ try
+ {
+ _ctx = new InitialContext(properties);
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Error Setting up JNDI Context:" + e);
+ }
+ }
+
+ /** Close the JNDI Context to keep everything happy. */
+ private void closeJNDI()
+ {
+ try
+ {
+ _ctx.close();
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Unable to close JNDI Context : " + e);
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ new Client();
+ }
+}
+
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java
new file mode 100644
index 0000000000..9c284eee97
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java
@@ -0,0 +1,236 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example.simple.reqresp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.io.BufferedReader;
+import java.io.BufferedInputStream;
+import java.io.Reader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+public class Server implements MessageListener
+{
+ final String BROKER = "localhost";
+
+ final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ final String CONNECTION_JNDI_NAME = "local";
+ final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'";
+
+ final String QUEUE_JNDI_NAME = "queue";
+ final String QUEUE_NAME = "example.RequestQueue";
+
+
+ private InitialContext _ctx;
+ private Session _session;
+ private MessageProducer _replyProducer;
+ private CountDownLatch _shutdownHook = new CountDownLatch(1);
+
+ public Server()
+ {
+ setupJNDI();
+
+ Connection connection;
+ try
+ {
+ connection = ((ConnectionFactory) lookupJNDI(CONNECTION_JNDI_NAME)).createConnection();
+
+ _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination requestQueue = (Queue) lookupJNDI(QUEUE_JNDI_NAME);
+
+ closeJNDI();
+
+ //Setup a message producer to respond to messages from clients, we will get the destination
+ //to send to from the JMSReplyTo header field from a Message so we MUST set the destination here to null.
+ this._replyProducer = _session.createProducer(null);
+
+ //Set up a consumer to consume messages off of the request queue
+ MessageConsumer consumer = _session.createConsumer(requestQueue);
+ consumer.setMessageListener(this);
+
+ //Now start the connection
+ connection.start();
+ }
+ catch (JMSException e)
+ {
+ //Handle the exception appropriately
+ System.err.println("JMSException occured setting up server :" + e);
+ return;
+ }
+
+ System.out.println("Server process started and waiting for messages.");
+
+ //Wait to process an single message then quit.
+ while (_shutdownHook.getCount() != 0)
+ {
+ try
+ {
+ _shutdownHook.await();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this as we are quitting anyway.
+ }
+ }
+
+ //Close the connection
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("A problem occured while shutting down the connection : " + e);
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ TextMessage response = this._session.createTextMessage();
+
+ //Check we have the right message type.
+ if (message instanceof TextMessage)
+ {
+ TextMessage txtMsg = (TextMessage) message;
+ String messageText = txtMsg.getText();
+
+ //Perform the request
+ System.out.println("Received request:" + messageText + " for message :" + message.getJMSMessageID());
+
+ //Set the response back to the client
+ response.setText("Response to Request:" + messageText);
+ }
+
+ //Set the correlation ID from the received message to be the correlation id of the response message
+ //this lets the client identify which message this is a response to if it has more than
+ //one outstanding message to the server
+ response.setJMSCorrelationID(message.getJMSMessageID());
+
+ try
+ {
+ System.out.println("Received message press enter to send response....");
+ new BufferedReader(new InputStreamReader(System.in)).readLine();
+ }
+ catch (IOException e)
+ {
+ //Error attemptying to pause
+ }
+
+ //Send the response to the Destination specified by the JMSReplyTo field of the received message.
+ _replyProducer.send(message.getJMSReplyTo(), response);
+ }
+ catch (JMSException e)
+ {
+ //Handle the exception appropriately
+ }
+
+ _shutdownHook.countDown();
+ }
+
+ /**
+ * Lookup the specified name in the JNDI Context.
+ *
+ * @param name The string name of the object to lookup
+ *
+ * @return The object or null if nothing exists for specified name
+ */
+ private Object lookupJNDI(String name)
+ {
+ try
+ {
+ return _ctx.lookup(name);
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Error looking up '" + name + "' in JNDI Context:" + e);
+ }
+
+ return null;
+ }
+
+ /**
+ * Setup the JNDI context.
+ *
+ * In this case we are simply using a Properties object to store the pairing information.
+ *
+ * Further details can be found on the wiki site here:
+ *
+ * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html
+ */
+ private void setupJNDI()
+ {
+ // Set the properties ...
+ Properties properties = new Properties();
+ properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
+ properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
+ properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME);
+
+ // Create the initial context
+ Context ctx = null;
+ try
+ {
+ _ctx = new InitialContext(properties);
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Error Setting up JNDI Context:" + e);
+ }
+ }
+
+ /** Close the JNDI Context to keep everything happy. */
+ private void closeJNDI()
+ {
+ try
+ {
+ _ctx.close();
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Unable to close JNDI Context : " + e);
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ new Server();
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
new file mode 100644
index 0000000000..e4eb5ac7f5
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.qpid.example.shared.Statics;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.*;
+/**
+ * Subclass of Subscriber which consumes a heartbeat message
+ */
+
+public class MonitoredSubscriber extends Subscriber
+{
+ protected String _monitorDestinationName;
+
+ private static final Logger _logger = LoggerFactory.getLogger(MonitoredSubscriber.class);
+
+ private MessageConsumer _monitorConsumer;
+
+ public MonitoredSubscriber()
+ {
+ super();
+ //lookup queue name and append suffix
+ _monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX;
+ }
+
+ /**
+ * MessageListener implementation for this subscriber
+ */
+ public static class MonitorMessageListener implements MessageListener
+ {
+ private String _name;
+
+ public MonitorMessageListener(String name)
+ {
+ _name = name;
+
+ }
+
+ /**
+ * Listens for heartbeat messages and acknowledges them
+ * @param message
+ */
+ public void onMessage(javax.jms.Message message)
+ {
+ _logger.info(_name + " monitor got message '" + message + "'");
+
+ try
+ {
+ _logger.debug("Monitor acknowledging recieved message");
+
+ //Now acknowledge the message to clear it from our queue
+ message.acknowledge();
+ }
+ catch(JMSException j)
+ {
+ _logger.error("Monitor caught JMSException trying to acknowledge message receipt");
+ j.printStackTrace();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Monitor caught unexpected exception trying to handle message");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Subscribes to Queue and attaches additional monitor listener
+ */
+ public void subscribeAndMonitor()
+ {
+ try
+ {
+ _connection = _connectionFactory.createConnection();
+
+ //create a transactional session
+ Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ Destination destination = session.createQueue(_monitorDestinationName);
+
+ //Create a consumer with a destination of our queue which will use defaults for prefetch etc
+ _monitorConsumer = session.createConsumer(destination);
+
+ //give the monitor message listener a name of it's own
+ _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener
+ ("MonitorListener " + System.currentTimeMillis()));
+
+ MonitoredSubscriber._logger.info("Starting monitored subscription ...");
+
+ _connection.start();
+
+ //and now start ordinary consumption too
+ subscribe();
+ }
+ catch (Throwable t)
+ {
+ _logger.error("Fatal error: " + t);
+ t.printStackTrace();
+ }
+ }
+
+ /**
+ * Stop consuming
+ */
+ public void stopMonitor()
+ {
+ try
+ {
+ _monitorConsumer.close();
+ _monitorConsumer = null;
+ stop();
+ }
+ catch(JMSException j)
+ {
+ _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
+ }
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
new file mode 100644
index 0000000000..5e78107182
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+
+/**
+ * Allows you to simply start a monitored subscriber
+ */
+public class MonitoredSubscriptionWrapper {
+
+ private static MonitoredSubscriber _subscriber;
+
+ /**
+ * Create a monitored subscriber and start it
+ * @param args - no params required
+ */
+ public static void main(String args[])
+ {
+ _subscriber = new MonitoredSubscriber();
+
+ _subscriber.subscribe();
+ }
+
+ /**
+ * Stop subscribing now ...
+ */
+ public static void stop()
+ {
+ _subscriber.stop();
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
new file mode 100644
index 0000000000..c36668575f
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+
+import org.apache.qpid.example.shared.InitialContextHelper;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Subscriber which consumes messages from a queue
+ */
+
+public class Subscriber
+{
+ private static final Logger _log = LoggerFactory.getLogger(Subscriber.class);
+
+ protected Connection _connection;
+
+ protected MessageConsumer _consumer;
+
+ protected InitialContextHelper _contextHelper;
+
+ protected AMQConnectionFactory _connectionFactory;
+
+ protected Destination _destination;
+
+ public Subscriber()
+ {
+ try
+ {
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //then create a connection using the AMQConnectionFactory
+ _connectionFactory = (AMQConnectionFactory) ctx.lookup("local");
+
+ //lookup queue from context
+ _destination = (Destination) ctx.lookup("MyQueue");
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _log.error("Exception", e);
+ }
+ }
+
+ /**
+ * Listener class that handles messages
+ */
+ public static class ExampleMessageListener implements MessageListener
+ {
+ private String _name;
+
+ public ExampleMessageListener(String name)
+ {
+ _name = name;
+ }
+
+ /**
+ * Listens for message callbacks, handles and then acknowledges them
+ * @param message - the message received
+ */
+ public void onMessage(javax.jms.Message message)
+ {
+ _log.info(_name + " got message '" + message + "'");
+
+ try
+ {
+ //NB: Handle your message appropriately for your application here
+ //do some stuff
+
+ _log.debug("Acknowledging recieved message");
+
+ //Now acknowledge the message to clear it from our queue
+ message.acknowledge();
+ }
+ catch(JMSException j)
+ {
+ _log.error("JMSException trying to acknowledge message receipt");
+ j.printStackTrace();
+ }
+ catch(Exception e)
+ {
+ _log.error("Unexpected exception trying to handle message");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Subscribes to example Queue and attaches listener
+ */
+ public void subscribe()
+ {
+ _log.info("Starting subscription ...");
+
+ try
+ {
+ _connection = _connectionFactory.createConnection();
+
+ //Non transactional session using client acknowledgement
+ Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ //Create a consumer with a destination of our queue which will use defaults for prefetch etc
+ _consumer = session.createConsumer(_destination);
+
+ //give the message listener a name of it's own
+ _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis()));
+
+ _connection.start();
+ }
+ catch (Throwable t)
+ {
+ _log.error("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ _log.info("Waiting for messages ...");
+
+ //wait for messages and sleep to survive failover
+ try
+ {
+ while(true)
+ {
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ }
+ catch (Exception e)
+ {
+ _log.warn("Exception while Subscriber sleeping",e);
+ }
+ }
+
+ /**
+ * Stop consuming and close connection
+ */
+ public void stop()
+ {
+ try
+ {
+ _consumer.close();
+ _consumer = null;
+ _connection.stop();
+ _connection.close();
+ }
+ catch(JMSException j)
+ {
+ _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
+ }
+ }
+
+}
+
+
+
+
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
new file mode 100644
index 0000000000..f8fbf63037
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+/**
+ * Allows you to simply start a subscriber
+ */
+public class SubscriptionWrapper {
+
+ private static Subscriber _subscriber;
+
+ /**
+ * Create a subscriber and start it
+ * @param args
+ */
+ public static void main(String args[])
+ {
+ _subscriber = new Subscriber();
+
+ _subscriber.subscribe();
+ }
+
+ /**
+ * Stop subscribing now ...
+ */
+ public static void stop()
+ {
+ _subscriber.stop();
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
new file mode 100644
index 0000000000..d7eb138523
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
@@ -0,0 +1,171 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.example.transport;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+import java.util.UUID;
+
+/**
+ * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as
+ * the transport for the Client API.
+ *
+ * The Demo here runs twice:
+ * 1. Just to show a simple publish and receive.
+ * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism.
+ */
+public class ExistingSocketConnectorDemo implements ConnectionListener
+{
+ private static boolean DEMO_FAILOVER = false;
+
+ public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException
+ {
+ System.out.println("Testing socket connection to localhost:5672.");
+
+ new ExistingSocketConnectorDemo();
+
+ System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673.");
+
+ DEMO_FAILOVER = true;
+
+ new ExistingSocketConnectorDemo();
+ }
+
+ Connection _connection;
+ MessageProducer _producer;
+ Session _session;
+
+ String Socket1_ID = UUID.randomUUID().toString();
+ String Socket2_ID = UUID.randomUUID().toString();
+
+
+
+ /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */
+ public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'";
+
+
+ public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException
+ {
+
+ Socket socket = SocketChannel.open().socket();
+ socket.connect(new InetSocketAddress("localhost", 5672));
+
+ TransportConnection.registerOpenSocket(Socket1_ID, socket);
+
+
+ _connection = new AMQConnection(CONNECTION);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue"));
+
+ _producer = _session.createProducer(_session.createQueue("Queue"));
+
+ _connection.start();
+
+ if (!DEMO_FAILOVER)
+ {
+ _producer.send(_session.createTextMessage("Simple Test"));
+ }
+ else
+ {
+ // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672.");
+ }
+
+ //We do a blocking receive here so that we can demonstrate failover.
+ Message message = consumer.receive();
+
+ System.out.println("Recevied :" + message);
+
+ _connection.close();
+ }
+
+ // ConnectionListener Interface
+
+ public void bytesSent(long count)
+ {
+ //not used in this example
+ }
+ public void bytesReceived(long count)
+ {
+ //not used in this example
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ /**
+ * This method is called before the underlying client library starts to reconnect. This gives us the opportunity
+ * to set a new socket for the failover to occur on.
+ */
+ try
+ {
+ Socket socket = SocketChannel.open().socket();
+
+ socket.connect(new InetSocketAddress("localhost", 5673));
+
+ // This is the new method to pass in an open socket for the connection to use.
+ TransportConnection.registerOpenSocket(Socket2_ID, socket);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //not used in this example - but must return true to allow the resubscription of existing clients.
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ // Now that failover has completed we can send a message that the receiving thread will pick up
+ try
+ {
+ _producer.send(_session.createTextMessage("Simple Failover Test"));
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/runSample.sh b/qpid/java/client/example/src/main/java/runSample.sh
new file mode 100755
index 0000000000..66338556a5
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/runSample.sh
@@ -0,0 +1,72 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# Work out the CLASSPATH divider
+UNAME=`uname -s`
+case $UNAME in
+ CYGWIN*)
+ DIVIDER=";"
+ ;;
+ *)
+ DIVIDER=":"
+;;
+esac
+
+if test "'x$QPID_HOME'" != "'x'"
+then
+ QPID_HOME=$QPID_HOME
+else
+ QPID_HOME="/usr/share/java/"
+fi
+echo "Using QPID_HOME: $QPID_HOME"
+
+if test "'x$QPID_SAMPLE'" != "'x'"
+then
+ QPID_SAMPLE=$QPID_SAMPLE
+else
+ QPID_SAMPLE=$PWD
+fi
+echo "Using QPID_SAMPLE: $QPID_SAMPLE"
+
+
+# set the CLASSPATH
+CLASSPATH=`find "$QPID_HOME" -name '*.jar' | tr '\n' "$DIVIDER"`
+
+
+# compile the samples
+javac -cp "$CLASSPATH" -sourcepath "$QPID_SAMPLE" -d . `find $QPID_SAMPLE -name '*.java'`
+
+# Add output classes to CLASSPATH
+CLASSPATH="$CLASSPATH$DIVIDER$."
+
+# Set VM parameters
+QPID_PARAM="$QPID_PARAM -Dlog4j.configuration=file://$PWD/log4j.xml"
+
+
+# Check if the user supplied a sample classname
+if test "'x$1'" = "'x'"
+then
+ echo "No sample classname specified"
+ exit;
+else
+ java -cp $CLASSPATH $QPID_PARAM $*
+fi