diff options
18 files changed, 346 insertions, 748 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java deleted file mode 100644 index 00ff092f8a..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.jmsexample.common; - -import java.util.Enumeration; -import java.util.Properties; - -/** - * Command-line argument processor. - */ -public class ArgProcessor -{ - /** Textual representation of program using parser. */ - private String _id; - - /** Command line arguments to parse. */ - private String[] _argv; - - /** Properties table mapping argument name to description */ - private Properties _options; - - /** Properties table mapping argument name to default value */ - private Properties _defaults; - - /** Properties table containing parsed arguments */ - private Properties _parsed; - - /** - * Create an argument parser and parse the supplied arguments. If the arguments - * cannot be parsed (or the argument <code>-help</code> is supplied) then - * exit - * - * @param id textual representation of program identity using parser. - * @param argv the argument array. - * @param options list of allowable options stored in the keys. - * @param defaults list of option default values keyed on option name. - */ - public ArgProcessor(String id, String[] argv, Properties options, Properties defaults) - { - _id = id; - _argv = argv; - _options = options; - _defaults = defaults; - // Try to parse. If we can't then exit - if (!parse()) - { - System.exit(0); - } - } - - /** - * Get the processed arguments. - * @return Properties table mapping argument name to current value, eg, ["-foo", "MyFoo"]. - */ - public Properties getProcessedArgs() - { - return _parsed; - } - - /** - * Display the current property settings on the supplied stream. - * Output sent to <code>System.out</code>. - */ - public void display() - { - System.out.println(_id + " current settings:"); - Enumeration optionEnumeration = _options.keys(); - while (optionEnumeration.hasMoreElements()) - { - String option = (String) optionEnumeration.nextElement(); - String description = (String) _options.get(option); - String currentValue = (String) _parsed.get(option); - if (currentValue != null) - { - System.out.println("\t" + description + " = " + currentValue); - } - } - System.out.println(); - } - - /** - * Get the value of the specified option as a String. - * @param option the option to query. - * @return the value of the option. - */ - public String getStringArgument(String option) - { - return _parsed.getProperty(option); - } - - /** - * Get the value of the specified option as an integer. - * @param option the option to query. - * @return the value of the option. - */ - public int getIntegerArgument(String option) - { - String value = _parsed.getProperty(option); - return Integer.parseInt(value); - } - - /** - * Get the value of the specified option as a boolean. - * @param option the option to query. - * @return the value of the option. - */ - public boolean getBooleanArgument(String option) - { - String value = _parsed.getProperty(option); - return Boolean.valueOf(value); - } - - /** - * Parse the arguments. - * @return true if parsed. - */ - private boolean parse() - { - boolean parsed = false; - _parsed = new Properties(); - if ((_argv.length == 1) && (_argv[0].equalsIgnoreCase("-help"))) - { - displayHelp(); - } - else - { - // Parse argv looking for options putting the results in results - for (int i = 0; i < _argv.length; i++) - { - String arg = _argv[i]; - if (arg.equals("-help")) - { - continue; - } - if (!arg.startsWith("-")) - { - System.err.println(_id + ": unexpected argument: " + arg); - } - else - { - if (_options.containsKey(arg)) - { - if (i == _argv.length - 1 || _argv[i + 1].startsWith("-")) - { - System.err.println(_id + ": missing value argument for: " + arg); - } - else - { - _parsed.put(arg, _argv[++i]); - } - } - else - { - System.err.println(_id + ": unrecognised option: " + arg); - } - } - } - - // Now add the default values if none have been specified in aggv - Enumeration optionsEnum = _options.keys(); - while (optionsEnum.hasMoreElements()) - { - String option = (String) optionsEnum.nextElement(); - - if (_parsed.get(option) == null) - { - String defaultValue = (String) _defaults.get(option); - - if (defaultValue != null) - { - _parsed.put(option, defaultValue); - } - } - } - parsed = true; - } - return parsed; - } - - /** - * Display all options with descriptions and default values (if specified). - * Output is sent to <code>System.out</code>. - */ - private void displayHelp() - { - System.out.println(_id + " available options:"); - Enumeration optionEnumeration = _options.keys(); - while (optionEnumeration.hasMoreElements()) - { - String option = (String) optionEnumeration.nextElement(); - String value = (String) _options.get(option); - String defaultValue = (String) _defaults.get(option); - if (defaultValue != null) - { - System.out.println("\t" + option + " <" + value + "> [" + defaultValue + "]"); - } - else - { - System.out.println("\t" + option + " <" + value + ">"); - } - } - } -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/BaseExample.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/BaseExample.java deleted file mode 100644 index 78aead5913..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/BaseExample.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.jmsexample.common; - -import javax.jms.*; -import javax.naming.Context; -import javax.naming.InitialContext; -import java.util.Hashtable; -import java.util.Properties; - -import org.apache.qpid.example.jmsexample.common.ArgProcessor; - -/** - * Abstract base class for providing common argument parsing features. - * <p/> - * <p>Classes that extend BaseExample support the following command-line arguments:</p> - * <table> - * <tr><td>-factoryName</td> <td>ConnectionFactory name</td></tr> - * <tr><td>-delMode</td> <td>Delivery mode [persistent | non-persistent]</td></tr> - * <tr><td>-numMessages</td> <td>Number of messages to process</td></tr> - * </table> - */ - -abstract public class BaseExample -{ - /* The AMQP INITIAL_CONTEXT_FACTORY */ - private static final String INITIAL_CONTEXT_FACTORY_NAME= - "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - /* Default connection factory name. */ - private static final String DEFAULT_CONNECTION_FACTORY_NAME="ConnectionFactory"; - - /* Default number of messages to process. */ - private static final int DEFAULT_NUMBER_MESSAGES=10; - - /* JNDI provider URL. */ - private String _providerURL; - - /* Number of messages to process. */ - private int _numberMessages; - - /* The delivery Mode */ - private int _deliveryMode; - - /* The argument processor */ - protected ArgProcessor _argProcessor; - - /* The supported properties */ - protected static Properties _options=new Properties(); - - /* The properties default values */ - protected static Properties _defaults=new Properties(); - - /* The broker communication objects */ - private InitialContext _initialContext; - private ConnectionFactory _connectionFactory; - - /** - * Protected constructor to create a example client. - * - * @param Id Identity string used in log messages, for example, the name of the example program. - * @param args String array of arguments. - */ - protected BaseExample(String Id, String[] args) - { - _options.put("-factoryName", "ConnectionFactory name"); - _defaults.put("-factoryName", DEFAULT_CONNECTION_FACTORY_NAME); - _options.put("-providerURL", "JNDI Provider URL"); - _options.put("-deliveryMode", "Delivery mode [persistent | non-persistent]"); - _defaults.put("-deliveryMode", "non-persistent"); - _options.put("-numMessages", "Number of messages to process"); - _defaults.put("-numMessages", String.valueOf(DEFAULT_NUMBER_MESSAGES)); - - _argProcessor=new ArgProcessor(Id, args, _options, _defaults); - _argProcessor.display(); - //Set the initial context factory - _providerURL=_argProcessor.getStringArgument("-providerURL"); - // Set the number of messages - _numberMessages=_argProcessor.getIntegerArgument("-numMessages"); - // Set the delivery mode - _deliveryMode=_argProcessor.getStringArgument("-deliveryMode") - .equals("persistent") ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; - } - - /** - * Get the DeliveryMode to use when publishing messages. - * - * @return The delivery mode, either javax.jms.DeliveryMode.NON_PERSISTENT - * or javax.jms.DeliveryMode.PERSISTENT. - */ - protected int getDeliveryMode() - { - return _deliveryMode; - } - - /** - * Get the number of messages to be used. - * - * @return the number of messages to be used. - */ - protected int getNumberMessages() - { - return _numberMessages; - } - - - /** - * Get the JNDI provider URL. - * - * @return the JNDI provider URL. - */ - private String getProviderURL() - { - return _providerURL; - } - - /** - * we assume that the environment is correctly set - * i.e. -Djava.naming.provider.url="..//example.properties" - * - * @return An initial context - * @throws Exception if there is an error getting the context - */ - public InitialContext getInitialContext() throws Exception - { - if (_initialContext == null) - { - Hashtable<String, String> jndiEnvironment=new Hashtable<String, String>(); - jndiEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY_NAME); - if (getProviderURL() != null) - { - jndiEnvironment.put(Context.PROVIDER_URL, getProviderURL()); - } - else - { - jndiEnvironment.put("connectionfactory.ConnectionFactory", - "qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672"); - } - _initialContext=new InitialContext(jndiEnvironment); - } - return _initialContext; - } - - /** - * Get a connection factory for the currently used broker - * - * @return A conection factory - * @throws Exception if there is an error getting the tactory - */ - public ConnectionFactory getConnectionFactory() throws Exception - { - if (_connectionFactory == null) - { - _connectionFactory=(ConnectionFactory) getInitialContext().lookup(DEFAULT_CONNECTION_FACTORY_NAME); - } - return _connectionFactory; - } - - /** - * Get a connection (remote or in-VM) - * - * @return a newly created connection - * @throws Exception if there is an error getting the connection - */ - public Connection getConnection() throws Exception - { - return getConnectionFactory().createConnection("guest", "guest"); - } -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java index f30136242d..f84b16f485 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java @@ -46,15 +46,10 @@ public class Consumer */ private static final String CLASS = "Consumer"; - /** - * Create a Consumer client. - */ - public Consumer() - { - } /** * Run the message consumer example. + * @param args Command line arguments. */ public static void main(String[] args) { @@ -80,7 +75,7 @@ public class Consumer Destination destination = (Destination)ctx.lookup("directQueue"); // Lookup the connection factory - ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local"); + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); // create the connection Connection connection = conFac.createConnection(); @@ -119,7 +114,7 @@ public class Consumer while (!end) { message = messageConsumer.receive(); - String text = ""; + String text; if (message instanceof TextMessage) { text = ((TextMessage) message).getText(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java index 691bea3a33..d2e1180c9b 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java @@ -61,15 +61,10 @@ public class Listener implements MessageListener */ private static boolean _failed = false; - /** - * Create an Listener client. - */ - public Listener() - { - } /** * Run the message consumer example. + * @param args Command line arguments. */ public static void main(String[] args) { @@ -95,7 +90,7 @@ public class Listener implements MessageListener Destination destination = (Destination)ctx.lookup("directQueue"); // Lookup the connection factory - ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local"); + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); // create the connection Connection connection = conFac.createConnection(); @@ -174,7 +169,7 @@ public class Listener implements MessageListener { try { - String text = ""; + String text; if (message instanceof TextMessage) { text = ((TextMessage) message).getText(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java index d42a0d8788..9cbd40f9ea 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java @@ -52,6 +52,7 @@ public class Producer /** * Run the message producer example. + * @param args Command line arguments. */ public static void main(String[] args) { @@ -75,7 +76,7 @@ public class Producer Destination destination = (Destination)ctx.lookup("directQueue"); // Lookup the connection factory - ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local"); + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); // create the connection Connection connection = conFac.createConnection(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties index 1d428d26d5..cc465e9251 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties @@ -18,14 +18,10 @@ # java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 # Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers # destination.[jniName] = [BindingURL] destination.directQueue = direct://amq.direct//message_queue?routingkey='routing_key'
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java index 1cdd2d941a..e813c0ce60 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java @@ -49,7 +49,6 @@ public class Consumer /** * Create a Consumer client. * - * @param args Command line arguments. */ public Consumer() { @@ -62,8 +61,6 @@ public class Consumer */ public static void main(String[] args) { - //_options.put("-queueName", "Queue name"); - //_defaults.put("-queueName", "message_queue"); Consumer syncConsumer = new Consumer(); syncConsumer.runTest(); } @@ -77,16 +74,16 @@ public class Consumer { // Load JNDI properties Properties properties = new Properties(); - properties.load(this.getClass().getResourceAsStream("direct.properties")); + properties.load(this.getClass().getResourceAsStream("fanout.properties")); //Create the initial context Context ctx = new InitialContext(properties); // look up destination - Destination destination = (Destination)ctx.lookup("directQueue"); + Destination destination = (Destination)ctx.lookup("fanoutQueue"); // Lookup the connection factory - ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local"); + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); // create the connection Connection connection = conFac.createConnection(); @@ -125,7 +122,7 @@ public class Consumer while (!end) { message = messageConsumer.receive(); - String text = ""; + String text; if (message instanceof TextMessage) { text = ((TextMessage) message).getText(); @@ -157,6 +154,7 @@ public class Consumer } catch (Exception exp) { + exp.printStackTrace(); System.err.println(CLASS + ": Caught an Exception: " + exp); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java index d7d2956dbb..4d645888a3 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java @@ -22,8 +22,6 @@ package org.apache.qpid.example.jmsexample.fanout; import java.util.Properties; -import org.apache.qpid.example.jmsexample.common.BaseExample; - import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; @@ -53,16 +51,7 @@ public class Listener implements MessageListener */ private static boolean _failed = false; - /** - * Create an Listener client. - * - * @param args Command line arguments. - */ - public Listener() - { - //super(CLASS, args); - //_queueName = _argProcessor.getStringArgument("-queueName"); - } + /** * Run the message consumer example. @@ -71,8 +60,6 @@ public class Listener implements MessageListener */ public static void main(String[] args) { - //_options.put("-queueName", "Queue name"); - //_defaults.put("-queueName", "message_queue"); Listener listener = new Listener(); listener.runTest(); } @@ -85,15 +72,15 @@ public class Listener implements MessageListener try { Properties properties = new Properties(); - properties.load(this.getClass().getResourceAsStream("direct.properties")); + properties.load(this.getClass().getResourceAsStream("fanout.properties")); //Create the initial context Context ctx = new InitialContext(properties); - Destination destination = (Destination)ctx.lookup("directQueue"); + Destination destination = (Destination)ctx.lookup("fanoutQueue"); // Declare the connection - ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local"); + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); Connection connection = conFac.createConnection(); // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection @@ -171,7 +158,7 @@ public class Listener implements MessageListener { try { - String text = ""; + String text; if (message instanceof TextMessage) { text = ((TextMessage) message).getText(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java index c917f6d753..6438e74ac7 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java @@ -22,9 +22,6 @@ package org.apache.qpid.example.jmsexample.fanout; import java.util.Properties; -import org.apache.qpid.example.jmsexample.common.BaseExample; - - import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; @@ -38,19 +35,9 @@ public class Producer private static final String CLASS = "Producer"; /* The queue name */ - private String _queueName; private int numMessages = 10; private short deliveryMode = 0; - /** - * Create a Producer client. - * @param args Command line arguments. - */ - public Producer () - { - // super(CLASS, args); - //_queueName = _argProcessor.getStringArgument("-queueName"); - } /** * Run the message producer example. @@ -58,8 +45,6 @@ public class Producer */ public static void main(String[] args) { - // _options.put("-queueName", "Queue name"); - // _defaults.put("-queueName", "message_queue"); Producer producer = new Producer(); producer.runTest(); } @@ -70,15 +55,15 @@ public class Producer { Properties properties = new Properties(); - properties.load(this.getClass().getResourceAsStream("direct.properties")); + properties.load(this.getClass().getResourceAsStream("fanout.properties")); //Create the initial context Context ctx = new InitialContext(properties); - Destination destination = (Destination)ctx.lookup("directQueue"); + Destination destination = (Destination)ctx.lookup("fanoutQueue"); // Declare the connection - ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local"); + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); Connection connection = conFac.createConnection(); // Create a session on the connection diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties index 446327d7e1..bde9ae4dae 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties @@ -18,14 +18,11 @@ # java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory -# use the following property to configure the default connector -#java.naming.provider.url - ignored. # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = qpid:password=pass;username=name@tcp:localhost:5672 +connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 # Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers # destination.[jniName] = [BindingURL] -destination.directQueue = fanout://amq.fanout//message_queue
\ No newline at end of file +destination.fanoutQueue = fanout://amq.fanout//message_queue
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java index 223e7bffd2..196f7c8245 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java @@ -17,38 +17,34 @@ */ package org.apache.qpid.example.jmsexample.pubsub; -import org.apache.qpid.example.jmsexample.common.BaseExample; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQTopicSubscriber; import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; /** * The example creates a MessageConsumer on the specified * Topic and uses a MessageListener with this MessageConsumer * in order to enable asynchronous delivery. */ -public class Listener extends BaseExample +public class Listener { /* Used in log output. */ - private static final String CLASS = "Listener"; + private static final String CLASS="Listener"; /* An object to synchronize on. */ - private final static Object _lock = new Object(); + private final static Object _lock=new Object(); /* A boolean to indicate a clean finish. */ - private static int _finished = 0; + private static int _finished=0; /* A boolean to indicate an unsuccesful finish. */ - private static boolean _failed = false; - - /** - * Create an Listener client. - * - * @param args Command line arguments. - */ - public Listener(String[] args) - { - super(CLASS, args); - } + private static boolean _failed=false; /** * Run the message consumer example. @@ -57,7 +53,7 @@ public class Listener extends BaseExample */ public static void main(String[] args) { - Listener listener = new Listener(args); + Listener listener=new Listener(); listener.runTest(); } @@ -68,8 +64,15 @@ public class Listener extends BaseExample { try { + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("pubsub.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + // Declare the connection - TopicConnection connection = (TopicConnection) getConnection(); + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + TopicConnection connection=(TopicConnection) conFac.createConnection(); // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection // so that errors raised within the JMS client library can be reported to the application @@ -90,37 +93,57 @@ public class Listener extends BaseExample // This session is a default choice of non-transacted and uses // the auto acknowledge feature of a session. System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); - TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - // lookup the topics usa - Topic topic = session.createTopic("usa.#"); + // lookup the topic usa + Topic topic=(Topic) ctx.lookup("usa"); // Create a Message Subscriber System.out.println(CLASS + ": Creating a Message Subscriber for topic usa.#"); - TopicSubscriber messageSubscriber = session.createSubscriber(topic); + TopicSubscriber messageSubscriber=session.createSubscriber(topic); + + // Bind each topic queue to the control queue so we know when to stop + // Warning: this is an AMQP specific code + ((AMQTopicSubscriber) messageSubscriber).addBindingKey( topic, "control"); + // Set a message listener on the messageConsumer messageSubscriber.setMessageListener(new MyMessageListener("usa")); - // lookup the topics world.usa.news - topic = session.createTopic("europe.#"); + // lookup the topic europe + topic=(Topic) ctx.lookup("europe"); // Create a Message Subscriber System.out.println(CLASS + ": Creating a Message Subscriber for topic europe.#"); - messageSubscriber = session.createSubscriber(topic); + messageSubscriber=session.createSubscriber(topic); + + // Bind each topic queue to the control queue so we know when to stop + // Warning: this is an AMQP specific code + ((AMQTopicSubscriber) messageSubscriber).addBindingKey( topic, "control"); + // Set a message listener on the messageConsumer messageSubscriber.setMessageListener(new MyMessageListener("europe")); - // lookup the topics world.europw - topic = session.createTopic("#.news"); + // lookup the topic news + topic=(Topic) ctx.lookup("news"); // Create a Message Subscriber System.out.println(CLASS + ": Creating a Message Subscriber for topic #.news"); - messageSubscriber = session.createSubscriber(topic); + messageSubscriber=session.createSubscriber(topic); + + // Bind each topic queue to the control queue so we know when to stop + // Warning: this is an AMQP specific code + ((AMQTopicSubscriber) messageSubscriber).addBindingKey( topic, "control"); + // Set a message listener on the messageConsumer messageSubscriber.setMessageListener(new MyMessageListener("news")); - // lookup the topics world.europw - topic = session.createTopic("#.weather"); + // lookup the topic weather + topic=(Topic) ctx.lookup("weather"); // Create a Message Subscriber System.out.println(CLASS + ": Creating a Message Subscriber for topic #.weather"); - messageSubscriber = session.createSubscriber(topic); + messageSubscriber=session.createSubscriber(topic); + + // Bind each topic queue to the control queue so we know when to stop + // Warning: this is an AMQP specific code + ((AMQTopicSubscriber) messageSubscriber).addBindingKey( topic, "control"); + // Set a message listener on the messageConsumer messageSubscriber.setMessageListener(new MyMessageListener("weather")); @@ -149,25 +172,23 @@ public class Listener extends BaseExample // Close the JNDI reference System.out.println(CLASS + ": Closing JNDI context"); - getInitialContext().close(); + ctx.close(); } catch (Exception exp) { + exp.printStackTrace(); System.err.println(CLASS + ": Caught an Exception: " + exp); } } private class MyMessageListener implements MessageListener { - /* The number of messages processed. */ - private int _messageCount = 0; - /* The topic this subscriber is subscribing to */ private String _topicName; public MyMessageListener(String topicName) { - _topicName = topicName; + _topicName=topicName; } /** @@ -182,12 +203,18 @@ public class Listener extends BaseExample { try { - // Increment the number of messages that have been received - _messageCount = _messageCount + 1; - // Print out the details of the just received message - System.out.println(CLASS + ": Received message for topic: " + _topicName + ": " + ((TextMessage) message).getText()); - // If this is the total number of messages required - if (((TextMessage) message).getText().equals("That's all, folks!")) + String text; + if (message instanceof TextMessage) + { + text=((TextMessage) message).getText(); + } + else + { + byte[] body=new byte[(int) ((BytesMessage) message).getBodyLength()]; + ((BytesMessage) message).readBytes(body); + text=new String(body); + } + if (text.equals("That's all, folks!")) { System.out.println(CLASS + ": Shutting down listener for " + _topicName); synchronized (_lock) @@ -196,6 +223,10 @@ public class Listener extends BaseExample _lock.notifyAll(); } } + else + { + System.out.println(CLASS + ": Received message for topic: " + _topicName + ": " + text); + } } catch (JMSException exp) { @@ -203,7 +234,7 @@ public class Listener extends BaseExample exp.printStackTrace(); synchronized (_lock) { - _failed = true; + _failed=true; _lock.notifyAll(); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java index 3748e106dd..fb22966b11 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java @@ -20,36 +20,27 @@ */ package org.apache.qpid.example.jmsexample.pubsub; -import org.apache.qpid.example.jmsexample.common.BaseExample; import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; /** * Publish messages to topics */ -public class Publisher extends BaseExample +public class Publisher { /* Used in log output. */ - private static final String CLASS = "Publisher"; + private static final String CLASS="Publisher"; - /** - * Create a Publisher client. - * - * @param args Command line arguments. - */ - public Publisher(String[] args) - { - super(CLASS, args); - } - - /** - * Run the message publisher example. - * + /** + * Run the message producer example. * @param args Command line arguments. */ public static void main(String[] args) { - Publisher publisher = new Publisher(args); + Publisher publisher = new Publisher(); publisher.runTest(); } @@ -57,80 +48,62 @@ public class Publisher extends BaseExample { try { + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("pubsub.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + // Declare the connection - TopicConnection connection = (TopicConnection) getConnection(); + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + TopicConnection connection= (TopicConnection) conFac.createConnection(); // Create a session on the connection // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); - TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Create a Message TextMessage message; System.out.println(CLASS + ": Creating a TestMessage to send to the topics"); - message = session.createTextMessage(); + message=session.createTextMessage(); - // lookup the topics .usa.weather - Topic topic = session.createTopic("usa.weather"); - message.setStringProperty("topicName", "usa.weather"); + // lookup the topics usa.weather + Topic topic = (Topic)ctx.lookup("usa.weather"); // Create a Message Publisher System.out.println(CLASS + ": Creating a Message Publisher for topic usa.weather"); - TopicPublisher messagePublisher = session.createPublisher(topic); + TopicPublisher messagePublisher=session.createPublisher(topic); publishMessages(message, messagePublisher); // lookup the topics usa.news - topic = session.createTopic("usa.news"); - message.setStringProperty("topicName", "usa.news"); + topic = (Topic)ctx.lookup("usa.news"); // Create a Message Publisher System.out.println(CLASS + ": Creating a Message Publisher for topic usa.news"); - messagePublisher = session.createPublisher(topic); + messagePublisher=session.createPublisher(topic); publishMessages(message, messagePublisher); // lookup the topics europe.weather - topic = session.createTopic("europe.weather"); - message.setStringProperty("topicName", "europe.weather"); + topic = (Topic)ctx.lookup("europe.weather"); // Create a Message Publisher System.out.println(CLASS + ": Creating a Message Publisher for topic europe.weather"); - messagePublisher = session.createPublisher(topic); + messagePublisher=session.createPublisher(topic); publishMessages(message, messagePublisher); // lookup the topics europe.news - topic = session.createTopic("europe.news"); - message.setStringProperty("topicName", "europe.news"); + topic = (Topic)ctx.lookup("europe.news"); // Create a Message Publisher System.out.println(CLASS + ": Creating a Message Publisher for topic europe.news"); messagePublisher = session.createPublisher(topic); publishMessages(message, messagePublisher); // send the final message - message = session.createTextMessage("That's all, folks!"); - topic = session.createTopic("#.news"); - message.setStringProperty("topicName", "news"); - // Create a Message Publisher - messagePublisher = session.createPublisher(topic); - messagePublisher - .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - - topic = session.createTopic("#.weather"); - message.setStringProperty("topicName", "weather"); - // Create a Message Publisher - messagePublisher = session.createPublisher(topic); - messagePublisher - .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - - topic = session.createTopic("europe.#"); - message.setStringProperty("topicName", "europe"); + message=session.createTextMessage("That's all, folks!"); + topic = (Topic)ctx.lookup("control"); // Create a Message Publisher messagePublisher = session.createPublisher(topic); messagePublisher - .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + .send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - topic = session.createTopic("usa.#"); - message.setStringProperty("topicName", "usa"); - // Create a Message Publisher - messagePublisher = session.createPublisher(topic); - messagePublisher - .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); // Close the connection to the broker System.out.println(CLASS + ": Closing connection"); @@ -138,7 +111,7 @@ public class Publisher extends BaseExample // Close the JNDI reference System.out.println(CLASS + ": Closing JNDI context"); - getInitialContext().close(); + ctx.close(); } catch (Exception exp) { @@ -148,15 +121,13 @@ public class Publisher extends BaseExample private void publishMessages(TextMessage message, TopicPublisher messagePublisher) throws JMSException { - // Loop to publish the requested number of messages. - for (int i = 1; i < getNumberMessages() + 1; i++) + // Loop to publish 5 messages. + for (int i=1; i <= 6; i++) { - // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages, - // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING. - message.setText("Message " + i); + message.setText("message " + i); System.out.println(CLASS + ": Sending message: " + i); messagePublisher - .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + .send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); } } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties new file mode 100644 index 0000000000..c72b0122df --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.usa.weather = usa.weather +topic.usa.news = usa.news +topic.europe.weather = europe.weather +topic.europe.news = europe.news +topic.weather = #.weather +topic.news = #.news +topic.europe = europe.# +topic.usa = usa.# +topic.control = control
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java index 78f91a677d..0589a3801b 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java @@ -20,35 +20,22 @@ */ package org.apache.qpid.example.jmsexample.requestResponse; -import org.apache.qpid.example.jmsexample.common.BaseExample; import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; /** * This example illustrates the use of the JMS utility class <code>QueueRequestor</code> * which provides a synchronous RPC-like abstraction using temporary destinations * to deliver responses back to the client. - * <p/> - * <p>Run with <code>-help</code> argument for a description of command line arguments. */ -public class Client extends BaseExample +public class Client { /* Used in log output. */ - private static final String CLASS = "Client"; + private static final String CLASS="Client"; - /* The queue name */ - private String _queueName; - - /** - * Create a Client client. - * - * @param args Command line arguments. - */ - public Client(String[] args) - { - super(CLASS, args); - _queueName = _argProcessor.getStringArgument("-queueName"); - } /** * Run the message requestor example. @@ -57,9 +44,7 @@ public class Client extends BaseExample */ public static void main(String[] args) { - _options.put("-queueName", "The queue name"); - _defaults.put("-queueName", "request"); - Client requestor = new Client(args); + Client requestor=new Client(); requestor.runTest(); } @@ -70,8 +55,18 @@ public class Client extends BaseExample { try { - // Declare the connection - QueueConnection connection = (QueueConnection) getConnection(); + // Load JNDI properties + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("requestResponse.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Lookup the connection factory + ConnectionFactory conFac = (ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + + // create the connection + QueueConnection connection = (QueueConnection) conFac.createConnection(); // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection // so that errors raised within the JMS client library can be reported to the application @@ -94,8 +89,7 @@ public class Client extends BaseExample QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup the destination - System.out.println(CLASS + ": Looking up queue with name: " + _queueName); - Queue destination = session.createQueue(_queueName); + Queue destination = (Queue) ctx.lookup("requestQueue"); // Create a QueueRequestor System.out.println(CLASS + ": Creating a QueueRequestor"); @@ -109,34 +103,26 @@ public class Client extends BaseExample // Create a message to send as a request for service TextMessage request; - request = session.createTextMessage(); + // Send some messages to the server's request queue + String[] messages = {"Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."}; // Get the number of times that this sample should request service - for (int i = 0; i < getNumberMessages(); i++) + for (String message : messages) { - request = session.createTextMessage("Twas brillig, and the slithy toves"); - sendReceive(request, requestor); - request = session.createTextMessage("Did gire and gymble in the wabe"); - sendReceive(request, requestor); - request = session.createTextMessage("All mimsy were the borogroves,"); - sendReceive(request, requestor); - request = session.createTextMessage("And the mome raths outgrabe."); - sendReceive(request, requestor); + request = session.createTextMessage(message); + sendReceive(request, requestor); } - //send the final message for ending the mirror - // And send a final message to indicate termination. - request.setText("That's all, folks!"); - MessageProducer messageProducer = session.createProducer(destination); - messageProducer.send(request, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - // Close the connection to the server System.out.println(CLASS + ": Closing connection"); connection.close(); // Close the JNDI reference System.out.println(CLASS + ": Closing JNDI context"); - getInitialContext().close(); + ctx.close(); } catch (Exception exp) { @@ -147,19 +133,19 @@ public class Client extends BaseExample private void sendReceive(TextMessage request, QueueRequestor requestor) throws JMSException { Message response; - response = requestor.request(request); + response=requestor.request(request); System.out.println(CLASS + ": \tRequest Content= " + request.getText()); // Print out the details of the response received String text; if (response instanceof TextMessage) { - text = ((TextMessage) response).getText(); + text=((TextMessage) response).getText(); } else { - byte[] body = new byte[(int) ((BytesMessage) response).getBodyLength()]; + byte[] body=new byte[(int) ((BytesMessage) response).getBodyLength()]; ((BytesMessage) response).readBytes(body); - text = new String(body); + text=new String(body); } System.out.println(CLASS + ": \tResponse Content= " + text); } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java index 4c0c47f7b4..2ac349a879 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.example.jmsexample.requestResponse; -import org.apache.qpid.example.jmsexample.common.BaseExample; - import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; /** * The example creates a MessageConsumer on the specified @@ -30,28 +31,11 @@ import javax.jms.*; * received message has a ReplyTo header then a new response message is sent * to that specified destination. */ -public class Server extends BaseExample +public class Server { /* Used in log output. */ - private static final String CLASS = "Server"; - - /* The destination type */ - private String _destinationType; - - /* The destination Name */ - private String _destinationName; + private static final String CLASS="Server"; - /** - * Create a Server client. - * - * @param args Command line arguments. - */ - public Server(String[] args) - { - super(CLASS, args); - _destinationType = _argProcessor.getStringArgument("-destinationType"); - _destinationName = _argProcessor.getStringArgument("-destinationName"); - } /** * Run the message mirror example. @@ -60,11 +44,7 @@ public class Server extends BaseExample */ public static void main(String[] args) { - _options.put("-destinationType", "Destination Type: queue/topic"); - _defaults.put("-destinationType", "queue"); - _options.put("-destinationName", "Destination Name"); - _defaults.put("-destinationName", "request"); - Server server = new Server(args); + Server server=new Server(); server.runTest(); } @@ -75,8 +55,18 @@ public class Server extends BaseExample { try { - // Declare the connection - Connection connection = getConnection(); + // Load JNDI properties + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("requestResponse.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Lookup the connection factory + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + + // create the connection + Connection connection=conFac.createConnection(); // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection // so that errors raised within the JMS client library can be reported to the application @@ -99,29 +89,18 @@ public class Server extends BaseExample // the auto acknowledge feature of a session. System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination; + // Lookup the destination + Queue destination = (Queue) ctx.lookup("requestQueue"); - if (_destinationType.equals("queue")) - { - // Lookup the queue - System.out.println(CLASS + ": Looking up queue with name: " + _destinationName); - destination = session.createQueue(_destinationName); - } - else - { - // Lookup the topic - System.out.println(CLASS + ": Looking up topic with name: " + _destinationName); - destination = session.createTopic(_destinationName); - } // Create a MessageConsumer System.out.println(CLASS + ": Creating a MessageConsumer"); - MessageConsumer messageConsumer = session.createConsumer(destination); + MessageConsumer messageConsumer=session.createConsumer(destination); /** - * Create a MessageProducer - note that although we create the + * Create a MessageProducer */ System.out.println(CLASS + ": Creating a MessageProducer"); MessageProducer messageProducer; @@ -133,46 +112,35 @@ public class Server extends BaseExample // Cycle round until all the messages are consumed. Message requestMessage; TextMessage responseMessage; - boolean end = false; + boolean end=false; while (!end) { System.out.println(CLASS + ": Receiving the message"); - requestMessage = messageConsumer.receive(); + requestMessage=messageConsumer.receive(); String text; if (requestMessage instanceof TextMessage) { - text = ((TextMessage) requestMessage).getText(); + text=((TextMessage) requestMessage).getText(); } else { - byte[] body = new byte[(int) ((BytesMessage) requestMessage).getBodyLength()]; + byte[] body=new byte[(int) ((BytesMessage) requestMessage).getBodyLength()]; ((BytesMessage) requestMessage).readBytes(body); - text = new String(body); - } - - - if (text.equals("That's all, folks!")) - { - System.out.println(CLASS + ": Received final message for " + destination); - end = true; - } - else - { - System.out.println(CLASS + ": \tContents = " + text); + text=new String(body); } // Now bounce the message if a ReplyTo header was set. if (requestMessage.getJMSReplyTo() != null) { - System.out.println(CLASS + ": Activating response queue listener for: " + destination); - responseMessage = session.createTextMessage(); + System.out.println(CLASS + ": Activating response queue listener"); + responseMessage=session.createTextMessage(); responseMessage.setText(text.toUpperCase()); System.out.println(CLASS + ": \tResponse = " + responseMessage.getText()); - messageProducer = session.createProducer(requestMessage.getJMSReplyTo()); + messageProducer=session.createProducer(requestMessage.getJMSReplyTo()); messageProducer.send(responseMessage); } System.out.println(); @@ -184,7 +152,7 @@ public class Server extends BaseExample // Close the JNDI reference System.out.println(CLASS + ": Closing JNDI context"); - getInitialContext().close(); + ctx.close(); } catch (Exception exp) { diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties new file mode 100644 index 0000000000..e732ce560d --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.requestQueue = request diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java index 7b52127313..f3bf9f8686 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.example.jmsexample.transacted; -import org.apache.qpid.example.jmsexample.common.BaseExample; - import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; /** * Transactional message example sends a number of messages to a Queue @@ -40,16 +41,11 @@ import javax.jms.*; * </ul> * <p/> */ -public class QueueToTopic extends BaseExample +public class QueueToTopic { /* Used in log output. */ - private static final String CLASS = "QueueToTopic"; - - /* The queue name */ - private String _queueName; + private static final String CLASS="QueueToTopic"; - /* The topic name */ - private String _topicName; /* Specify if the transaction is committed */ private boolean _commit; @@ -57,31 +53,29 @@ public class QueueToTopic extends BaseExample /** * Create a QueueToTopic client. * - * @param args Command line arguments. + * @param commit Specifies if the transaction should be committed. */ - public QueueToTopic(String[] args) + public QueueToTopic(boolean commit) { - super(CLASS, args); - _queueName = _argProcessor.getStringArgument("-queueName"); - _topicName = _argProcessor.getStringArgument("-topicName"); - _commit = _argProcessor.getBooleanArgument("-commit"); + _commit=commit; } /** * Run the message mover example. * * @param args Command line arguments. - * @see BaseExample */ public static void main(String[] args) { - _options.put("-topicName", "The topic name"); - _defaults.put("-topicName", "world"); - _options.put("-queueName", "The queue name"); - _defaults.put("-queueName", "message_queue"); - _options.put("-commit", "Commit or rollback the transaction (true|false)"); - _defaults.put("-commit", "true"); - QueueToTopic mover = new QueueToTopic(args); + boolean commit=true; + if (args.length > 1) + { + if (args[0].equalsIgnoreCase("-rollback")) + { + commit=!Boolean.getBoolean(args[1]); + } + } + QueueToTopic mover=new QueueToTopic(commit); mover.runTest(); } @@ -89,8 +83,17 @@ public class QueueToTopic extends BaseExample { try { - // Declare the connection - Connection connection = getConnection(); + // Load JNDI properties + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("transacted.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Lookup the connection factory + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + // create the connection + Connection connection=conFac.createConnection(); // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection // so that errors raised within the JMS client library can be reported to the application @@ -116,24 +119,22 @@ public class QueueToTopic extends BaseExample * from the topic. */ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); - Session nonTransactedSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session nonTransactedSession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup the queue - System.out.println(CLASS + ": Looking up queue with name: " + _queueName); - Queue queue = nonTransactedSession.createQueue(_queueName); + Queue queue=(Queue) ctx.lookup("transactedQueue"); // Lookup the topic - System.out.println(CLASS + ": Looking up topic with name: " + _topicName); - Topic topic = nonTransactedSession.createTopic(_topicName); - + Topic topic=(Topic) ctx.lookup("transactedTopic"); + // Make sure that the queue is empty System.out.print(CLASS + ": Purging messages from queue..."); - MessageConsumer queueMessageConsumer = nonTransactedSession.createConsumer(queue); + MessageConsumer queueMessageConsumer=nonTransactedSession.createConsumer(queue); Message purgedMessage; - int numberPurged = -1; + int numberPurged=-1; do { - purgedMessage = queueMessageConsumer.receiveNoWait(); + purgedMessage=queueMessageConsumer.receiveNoWait(); numberPurged++; } while (purgedMessage != null); @@ -141,41 +142,42 @@ public class QueueToTopic extends BaseExample // Create the MessageProducer for the queue System.out.println(CLASS + ": Creating a MessageProducer for the queue"); - MessageProducer messageProducer = nonTransactedSession.createProducer(queue); + MessageProducer messageProducer=nonTransactedSession.createProducer(queue); // Now create the MessageConsumer for the topic System.out.println(CLASS + ": Creating a MessageConsumer for the topic"); - MessageConsumer topicMessageConsumer = nonTransactedSession.createConsumer(topic); + MessageConsumer topicMessageConsumer=nonTransactedSession.createConsumer(topic); // Create a textMessage. We're using a TextMessage for this example. System.out.println(CLASS + ": Creating a TestMessage to send to the destination"); - TextMessage textMessage = nonTransactedSession.createTextMessage("Sample text message"); + TextMessage textMessage=nonTransactedSession.createTextMessage("Sample text message"); // Loop to publish the requested number of messages to the queue. - for (int i = 1; i < getNumberMessages() + 1; i++) + for (int i=1; i <= 5; i++) { messageProducer - .send(textMessage, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + .send(textMessage, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, + Message.DEFAULT_TIME_TO_LIVE); // Print out details of textMessage just sent System.out.println(CLASS + ": Message sent: " + i + " " + textMessage.getJMSMessageID()); } // Create a new transacted Session to move the messages from the queue to the topic - Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED); + Session transactedSession=connection.createSession(true, Session.SESSION_TRANSACTED); // Create a new message consumer from the queue - MessageConsumer transactedConsumer = transactedSession.createConsumer(queue); + MessageConsumer transactedConsumer=transactedSession.createConsumer(queue); // Create a new message producer for the topic - MessageProducer transactedProducer = transactedSession.createProducer(topic); + MessageProducer transactedProducer=transactedSession.createProducer(topic); // Loop to consume the messages from the queue and publish them to the topic Message receivedMessage; - for (int i = 1; i < getNumberMessages() + 1; i++) + for (int i=1; i <= 5; i++) { // Receive a message - receivedMessage = transactedConsumer.receive(); + receivedMessage=transactedConsumer.receive(); System.out.println(CLASS + ": Moving message: " + i + " " + receivedMessage.getJMSMessageID()); // Publish it to the topic transactedProducer.send(receivedMessage); @@ -192,7 +194,7 @@ public class QueueToTopic extends BaseExample System.out.println(CLASS + ": Rolling back transacted session."); transactedSession.rollback(); } - + // Now consume any outstanding messages on the queue System.out.print(CLASS + ": Mopping up messages from queue"); if (_commit) @@ -201,15 +203,15 @@ public class QueueToTopic extends BaseExample } else { - System.out.print(" (expecting " + getNumberMessages() + ")..."); + System.out.print(" (expecting " + 5 + ")..."); } Message moppedMessage; - int numberMopped = 0; + int numberMopped=0; do { - moppedMessage = queueMessageConsumer.receiveNoWait(); - if( moppedMessage != null) + moppedMessage=queueMessageConsumer.receiveNoWait(); + if (moppedMessage != null) { numberMopped++; } @@ -222,18 +224,18 @@ public class QueueToTopic extends BaseExample if (_commit) { - System.out.print(" (expecting " + getNumberMessages() + ")..."); + System.out.print(" (expecting " + 5 + ")..."); } else { System.out.print(" (expecting none)..."); } - numberMopped = 0; + numberMopped=0; do { - moppedMessage = topicMessageConsumer.receiveNoWait(); - if( moppedMessage != null) + moppedMessage=topicMessageConsumer.receiveNoWait(); + if (moppedMessage != null) { numberMopped++; } @@ -247,7 +249,7 @@ public class QueueToTopic extends BaseExample // Close the JNDI reference System.out.println(CLASS + ": Closing JNDI context"); - getInitialContext().close(); + ctx.close(); } catch (Exception exp) { diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties new file mode 100644 index 0000000000..394d5f9036 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.transactedQueue = transactedQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.transactedTopic = transactedTopic
\ No newline at end of file |