diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-05-26 23:41:46 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-05-26 23:41:46 +0000 |
commit | 2d92397a1209bf4c4645d76e3104abd1e58a2d66 (patch) | |
tree | 5eaafa517d0e9eb02430c26a3df52bfbfe421a9a /java | |
parent | 248037126b68039ae3c0f4da6eef3b87d9e02f7f (diff) | |
download | qpid-python-2d92397a1209bf4c4645d76e3104abd1e58a2d66.tar.gz |
Added examples to match the c++ and python clients.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@948636 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 805 insertions, 0 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/java/client/example/src/main/java/org/apache/qpid/example/Drain.java new file mode 100644 index 0000000000..51491326c4 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/Drain.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.slf4j.Logger; + +public class Drain extends OptionParser +{ + + static final Option FOREVER = new Option("f", + "forever", + "ignore timeout and wait forever", + null, + null, + Boolean.class); + + static + { + optDefs.add(BROKER); + optDefs.add(HELP); + optDefs.add(TIMEOUT); + optDefs.add(FOREVER); + optDefs.add(CON_OPTIONS); + optDefs.add(BROKER_OPTIONS); + } + + public Drain(String[] args, String usage, String desc) throws Exception + { + super(args, usage, desc); + + Connection con = createConnection(); + con.start(); + Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + Destination dest = new AMQAnyDestination(address); + MessageConsumer consumer = ssn.createConsumer(dest); + Message msg; + + long timeout = -1; + if (containsOp(TIMEOUT)) { timeout = Integer.parseInt(getOp(TIMEOUT))*1000; } + if (containsOp(FOREVER)) { timeout = 0; } + + while ((msg = consumer.receive(timeout)) != null) + { + System.out.println("\n------------- Msg -------------"); + System.out.println(msg); + System.out.println("-------------------------------\n"); + } + + ssn.close(); + con.close(); + } + + public static void main(String[] args) throws Exception + { + String u = "Usage: drain [OPTIONS] 'ADDRESS'"; + String d = "Drains messages from the specified address."; + + Drain drain = new Drain(args,u,d); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/java/client/example/src/main/java/org/apache/qpid/example/Hello.java new file mode 100644 index 0000000000..949ee4dac6 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/Hello.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; + + +public class Hello +{ + + public Hello() + { + } + + public static void main(String[] args) + { + Hello hello = new Hello(); + hello.runTest(); + } + + private void runTest() + { + try { + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("hello.properties")); + Context context = new InitialContext(properties); + + ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = (Destination) context.lookup("topicExchange"); + + MessageProducer messageProducer = session.createProducer(destination); + MessageConsumer messageConsumer = session.createConsumer(destination); + + TextMessage message = session.createTextMessage("Hello world!"); + messageProducer.send(message); + + message = (TextMessage)messageConsumer.receive(); + System.out.println(message.getText()); + + connection.close(); + context.close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java b/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java new file mode 100644 index 0000000000..89db04f8d3 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/MapReceiver.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + + +public class MapReceiver { + + public static void main(String[] args) throws Exception + { + Connection connection = + new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); + + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); + MessageConsumer consumer = session.createConsumer(queue); + + MapMessage m = (MapMessage)consumer.receive(); + System.out.println(m); + connection.close(); + } + +}
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java b/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java new file mode 100644 index 0000000000..0ce9383add --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/MapSender.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + + +public class MapSender { + + public static void main(String[] args) throws Exception + { + Connection connection = + new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); + MessageProducer producer = session.createProducer(queue); + + MapMessage m = session.createMapMessage(); + m.setIntProperty("Id", 987654321); + m.setStringProperty("name", "Widget"); + m.setDoubleProperty("price", 0.99); + + List<String> colors = new ArrayList<String>(); + colors.add("red"); + colors.add("green"); + colors.add("white"); + m.setObject("colours", colors); + + Map<String,Double> dimensions = new HashMap<String,Double>(); + dimensions.put("length",10.2); + dimensions.put("width",5.1); + dimensions.put("depth",2.0); + m.setObject("dimensions",dimensions); + + List<List<Integer>> parts = new ArrayList<List<Integer>>(); + parts.add(Arrays.asList(new Integer[] {1,2,5})); + parts.add(Arrays.asList(new Integer[] {8,2,5})); + m.setObject("parts", parts); + + Map<String,Object> specs = new HashMap<String,Object>(); + specs.put("colours", colors); + specs.put("dimensions", dimensions); + specs.put("parts", parts); + m.setObject("specs",specs); + + producer.send(m); + connection.close(); + } + +}
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java b/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java new file mode 100644 index 0000000000..6b1f514258 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java @@ -0,0 +1,335 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; + +import org.apache.qpid.client.AMQConnection; + +public class OptionParser +{ + static final Option BROKER = new Option("b", + "broker", + "connect to specified broker", + "USER:PASS@HOST:PORT", + "guest:guest@localhost:5672", + String.class); + + static final Option HELP = new Option("h", + "help", + "show this help message and exit", + null, + null, + Boolean.class); + + static final Option TIMEOUT = new Option("t", + "timeout", + "timeout in seconds to wait before exiting", + "TIMEOUT", + "0", + Integer.class); + + static final Option CON_OPTIONS = new Option(null, + "con-option", + "JMS Connection URL options. Ex sync_ack=true sync_publish=all ", + "NAME=VALUE", + null, + String.class); + + + static final Option BROKER_OPTIONS = new Option(null, + "broker-option", + "JMS Broker URL options. Ex ssl=true sasl_mechs=GSSAPI ", + "NAME=VALUE", + null, + String.class); + + + protected Map<String,Object> optMap = new HashMap<String,Object>(); + protected static List<Option> optDefs = new ArrayList<Option>(); + + protected String usage; + protected String desc; + protected String address; + + public OptionParser(String[] args, String usage, String desc) + { + this.usage = usage; + this.desc = desc; + + if (args.length == 0 || + (args.length == 1 && (args[0].equals("-h") || args[0].equals("--help")))) + { + printHelp(); + } + + address = args[args.length -1]; + String[] ops = new String[args.length -1]; + System.arraycopy(args, 0, ops, 0, ops.length); + parseOpts(ops); + + System.out.println(optMap); + + if (isHelp()) + { + printHelp(); + } + } + + public boolean isHelp() + { + return optMap.containsKey("h") || optMap.containsKey("help"); + } + + public void printHelp() + { + System.out.println(String.format("%s\n",usage)); + System.out.println(String.format("%s\n",desc)); + System.out.println(String.format("%s\n","Options:")); + + for (Option op : optDefs) + { + String valueLabel = op.getValueLabel() != null ? "=" + op.getValueLabel() : ""; + String shortForm = op.getShortForm() != null ? "-" + op.getShortForm() + valueLabel : ""; + String longForm = op.getLongForm() != null ? "--" + op.getLongForm() + valueLabel : ""; + String desc = op.getDesc(); + String defaultValue = op.getDefaultValue() != null ? + " (default " + op.getDefaultValue() + ")" : ""; + + if (!shortForm.equals("")) + { + longForm = shortForm + ", " + longForm; + } + System.out.println( + String.format("%-54s%s%s", longForm,desc,defaultValue)); + } + + System.exit(0); + } + + private void parseOpts(String[] args) + { + String prevOpt = null; + for(String op: args) + { + // covers both -h and --help formats + if (op.startsWith("-")) + { + String key = op.substring(op.startsWith("--")? 2:1 , + (op.indexOf('=') > 0) ? + op.indexOf('='): + op.length()); + + boolean match = false; + for (Option option: optDefs) + { + + if ((op.startsWith("-") && option.shortForm != null && option.shortForm.equals(key)) || + (op.startsWith("--") && option.longForm != null && option.longForm.equals(key)) ) + { + match = true; + break; + } + } + + if (!match) + { + System.out.println(op + " is not a valid option"); + System.exit(0); + } + + if (op.indexOf('=') > 0) + { + String val = extractValue(op.substring(op.indexOf('=')+1)); + if (optMap.containsKey(key)) + { + optMap.put(key, optMap.get(key) + "," + val); + } + else + { + optMap.put(key, val); + } + } + else + { + if (! optMap.containsKey(key)){ optMap.put(key, ""); } + prevOpt = key; + } + } + else if (prevOpt != null) // this is to catch broker localhost:5672 instead broker=localhost:5672 + { + String val = extractValue(op); + if (optMap.containsKey(prevOpt) && !optMap.get(prevOpt).toString().equals("")) + { + optMap.put(prevOpt, optMap.get(prevOpt) + "," + val); + } + else + { + optMap.put(prevOpt, val); + } + prevOpt = null; + } + else + { + System.out.println(optMap); + throw new IllegalArgumentException(op + " is not a valid option"); + } + } + } + + private String extractValue(String op) + { + if (op.startsWith("'")) + { + if (!op.endsWith("'")) + throw new IllegalArgumentException(" The option " + op + " needs to be inside quotes"); + + return op.substring(1,op.length() -1); + } + else + { + return op; + } + } + + protected boolean containsOp(Option op) + { + return optMap.containsKey(op.shortForm) || optMap.containsKey(op.longForm); + } + + protected String getOp(Option op) + { + if (optMap.containsKey(op.shortForm)) + { + return (String)optMap.get(op.shortForm); + } + else if (optMap.containsKey(op.longForm)) + { + return (String)optMap.get(op.longForm); + } + else + { + return op.getDefaultValue(); + } + } + + protected Connection createConnection() throws Exception + { + StringBuffer buf; + buf = new StringBuffer(); + buf.append("amqp://"); + String userPass = "guest:guest"; + String broker = "localhost:5672"; + if(containsOp(BROKER)) + { + try + { + String b = getOp(BROKER); + userPass = b.substring(0,b.indexOf('@')); + broker = b.substring(b.indexOf('@')+1); + } + catch (StringIndexOutOfBoundsException e) + { + Exception ex = new Exception("Error parsing broker string " + getOp(BROKER)); + ex.initCause(e); + throw ex; + } + + } + + if(containsOp(BROKER_OPTIONS)) + { + String bOps = getOp(BROKER_OPTIONS); + bOps = bOps.replaceAll(",", "'&"); + bOps = bOps.replaceAll("=", "='"); + broker = broker + "?" + bOps + "'"; + } + buf.append(userPass); + buf.append("@test/test?brokerlist='tcp://"); + buf.append(broker).append("'"); + if(containsOp(CON_OPTIONS)) + { + String bOps = getOp(CON_OPTIONS); + bOps = bOps.replaceAll(",", "'&"); + bOps = bOps.replaceAll("=", "='"); + buf.append("&").append(bOps).append("'"); + } + + Connection con = new AMQConnection(buf.toString()); + return con; + } + + static class Option + { + private String shortForm; + private String longForm; + private String desc; + private String valueLabel; + private String defaultValue; + private Class type; + + public Option(String shortForm, String longForm, String desc, + String valueLabel, String defaultValue, Class type) + { + this.shortForm = shortForm; + this.longForm = longForm; + this.defaultValue = defaultValue; + this.type = type; + this.desc = desc; + this.valueLabel = valueLabel; + } + + public String getShortForm() + { + return shortForm; + } + + public String getLongForm() + { + return longForm; + } + + public String getDefaultValue() + { + return defaultValue; + } + + public Class getType() + { + return type; + } + + public String getDesc() + { + return desc; + } + + public String getValueLabel() + { + return valueLabel; + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/java/client/example/src/main/java/org/apache/qpid/example/Spout.java new file mode 100644 index 0000000000..e831df8d28 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/Spout.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; + +public class Spout extends OptionParser +{ + + static final Option COUNT = new Option("c", + "count", + "stop after count messages have been sent, zero disables", + "COUNT", + "1", + Integer.class); + + static final Option ID = new Option("i", + "id", + "use the supplied id instead of generating one", + null, + null, + Boolean.class); + + static final Option CONTENT = new Option(null, + "content", + "specify textual content", + "TEXT", + null, + Boolean.class); + + static final Option MSG_PROPERTY = new Option("P", + "property", + "specify message property", + "NAME=VALUE", + null, + Boolean.class); + + static final Option MAP_ENTRY = new Option("M", + "map", + "specify entry for map content", + "KEY=VALUE", + null, + Boolean.class); + + static + { + optDefs.add(BROKER); + optDefs.add(HELP); + optDefs.add(TIMEOUT); + optDefs.add(COUNT); + optDefs.add(MSG_PROPERTY); + optDefs.add(MAP_ENTRY); + optDefs.add(CONTENT); + optDefs.add(CON_OPTIONS); + optDefs.add(BROKER_OPTIONS); + } + + public Spout(String[] args, String usage, String desc) throws Exception + { + super(args, usage, desc); + + Connection con = createConnection(); + con.start(); + Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + Destination dest = new AMQAnyDestination(address); + MessageProducer producer = ssn.createProducer(dest); + + int count = Integer.parseInt(getOp(COUNT)); + + for (int i=0; i < count; i++) + { + Message msg = createMessage(ssn); + producer.send(msg); + System.out.println("\n------------- Msg -------------"); + System.out.println(msg); + System.out.println("-------------------------------\n"); + } + ssn.close(); + con.close(); + } + + private Message createMessage(Session ssn) throws Exception + { + if (containsOp(MAP_ENTRY)) + { + MapMessage msg = ssn.createMapMessage(); + for (String pair: getOp(MAP_ENTRY).split(",")) + { + msg.setString(pair.substring(0, pair.indexOf('=')), + pair.substring(pair.indexOf('=') + 1)); + } + setProperties(msg); + return msg; + } + else + { + Message msg = + ssn.createTextMessage(containsOp(CONTENT) ? getOp(CONTENT) : ""); + setProperties(msg); + return msg; + } + } + + private void setProperties(Message m) throws Exception + { + if(containsOp(MSG_PROPERTY)) + { + for (String pair: getOp(MSG_PROPERTY).split(",")) + { + m.setStringProperty(pair.substring(0, pair.indexOf('=')), + pair.substring(pair.indexOf('=') + 1)); + } + } + } + + public static void main(String[] args) throws Exception + { + String u = "Usage: spout [OPTIONS] 'ADDRESS'"; + String d = "Send messages to the specified address."; + + Spout drain = new Spout(args,u,d); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/hello.properties b/java/client/example/src/main/java/org/apache/qpid/example/hello.properties new file mode 100644 index 0000000000..27ea66b318 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/hello.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# Register an AMQP destination in JNDI +# destination.[jniName] = [Address Format] +destination.topicExchange = amq.topic |