From b442c78351bf330cf23b67e86aa17424d5a78966 Mon Sep 17 00:00:00 2001 From: Marnie McCormack Date: Fri, 24 Nov 2006 12:12:43 +0000 Subject: Initial example class overhaul - still some way to go. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478854 13f79535-47bb-0310-9956-ffa450edef68 --- .../example/publisher/FileMessageDispatcher.java | 20 +-- .../example/publisher/MessageFactoryException.java | 6 - .../publisher/MonitorMessageDispatcher.java | 9 +- .../qpid/example/publisher/MonitorPublisher.java | 15 +- .../apache/qpid/example/publisher/Publisher.java | 94 +++--------- .../publisher/UndeliveredMessageException.java | 4 - .../qpid/example/shared/ConnectionException.java | 6 - .../qpid/example/shared/ContextException.java | 73 ++++++++++ .../org/apache/qpid/example/shared/FileUtils.java | 4 - .../qpid/example/shared/InitialContextHelper.java | 78 ++++++++++ .../org/apache/qpid/example/shared/Statics.java | 5 +- .../apache/qpid/example/shared/example.properties | 21 +++ .../example/subscriber/MonitoredSubscriber.java | 51 +++---- .../subscriber/MonitoredSubscriptionWrapper.java | 8 +- .../apache/qpid/example/subscriber/Subscriber.java | 158 ++++++++------------- .../example/subscriber/SubscriptionWrapper.java | 8 +- .../apache/qpid/example/test/TestAMSPubSub.java | 14 +- .../qpid/example/test/TestMultSubscribers.java | 10 +- .../apache/qpid/example/test/TestPublisher.java | 4 +- .../apache/qpid/example/test/TestSubscriber.java | 70 --------- 20 files changed, 293 insertions(+), 365 deletions(-) create mode 100644 java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java create mode 100644 java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java create mode 100644 java/client/src/test/java/org/apache/qpid/example/shared/example.properties delete mode 100644 java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java (limited to 'java') diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index f338a0b140..ca3e5ce3f5 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -30,11 +30,7 @@ import javax.jms.JMSException; /** * Class that sends message files to the Publisher to distribute * using files as input - * Must set system properties for host etc or amend and use config props - * Author: Marnie McCormack - * Date: 20-Jul-2006 - * Time: 09:56:56 - * Copyright JPMorgan Chase 2006 + * Must set properties for host in properties file or uses in vm broker */ public class FileMessageDispatcher { @@ -47,7 +43,7 @@ public class FileMessageDispatcher { public static void main(String[] args) { - //Check command line args ok - must provide a path or file for us to run + //Check command line args ok - must provide a path or file for us to dispatch if (args.length == 0) { System.err.println("Usage: FileMessageDispatcher " + ""); @@ -134,8 +130,6 @@ public class FileMessageDispatcher { /* * Returns a _publisher for a queue - * Using system properties to get connection info for now - * Must set using -D the host, client, queue, user, pwd, virtual path, archive path */ private static Publisher getPublisher() { @@ -144,14 +138,8 @@ public class FileMessageDispatcher { return _publisher; } - //Create _publisher using system properties - Properties props = System.getProperties(); - - //Create a _publisher using failover details - _publisher = new Publisher(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.CLIENT_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.ARCHIVE_PATH)); + //Create a _publisher + _publisher = new Publisher(); _publisher.setName(DEFAULT_PUB_NAME); return _publisher; diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java index f9dea268d2..34360d6708 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java @@ -20,12 +20,6 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; -/** - * Author: Marnie McCormack - * Date: 18-Jul-2006 - * Time: 11:13:23 - * Copyright JPMorgan Chase 2006 - */ public class MessageFactoryException extends Exception { private int _errorCode; diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index e47a66cbc1..16b32da22a 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -27,10 +27,6 @@ import java.util.Properties; /** * Class that sends heartbeat messages to allow monitoring of message consumption * Sends regular (currently 20 seconds apart) heartbeat message - * Author: Marnie McCormack - * Date: 20-Jul-2006 - * Time: 09:56:56 - * Copyright JPMorgan Chase 2006 */ public class MonitorMessageDispatcher { @@ -122,10 +118,7 @@ public class MonitorMessageDispatcher { Properties props = System.getProperties(); //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.CLIENT_PROPERTY), Statics.MONITOR_QUEUE, - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.ARCHIVE_PATH)); + _monitorPublisher = new MonitorPublisher(); _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); return _monitorPublisher; diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 0c5e81a0d1..233c3fea0a 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -27,10 +27,6 @@ import org.apache.log4j.Logger; /** * Subclass of Publisher which uses QPID functionality to send a heartbeat message * Note immediate flag not available via JMS MessageProducer - * Author: Marnie McCormack - * Date: 12-Sep-2006 - * Time: 09:41:07 - * Copyright JPMorgan Chase 2006 */ public class MonitorPublisher extends Publisher { @@ -39,16 +35,9 @@ public class MonitorPublisher extends Publisher BasicMessageProducer _producer; - public MonitorPublisher(String host, int port, String clientID, String queueName, - String user, String password, String virtualPath, String destinationDir) + public MonitorPublisher() { - super(host,port,clientID,queueName,user,password,virtualPath,destinationDir); - } - - public MonitorPublisher(String hostdetails, String clientID, String queueName, - String user, String password, String virtualPath, String destinationDir) - { - super(hostdetails,clientID,queueName,user,password,virtualPath,destinationDir); + super(); } /* diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java index 0e66599fc6..d64fd9b142 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java @@ -20,7 +20,7 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.jms.Session; @@ -30,15 +30,16 @@ import javax.jms.DeliveryMode; import javax.jms.Queue; import javax.jms.MessageProducer; import javax.jms.Connection; +import javax.naming.InitialContext; - -import org.apache.qpid.example.shared.ConnectionException; -import org.apache.qpid.example.shared.Statics; +import org.apache.qpid.example.shared.InitialContextHelper; public class Publisher { private static final Logger _log = Logger.getLogger(Publisher.class); + protected InitialContextHelper _contextHelper; + protected Connection _connection; protected Session _session; @@ -51,62 +52,33 @@ public class Publisher protected Queue _destination; + protected static final String _defaultDestinationDir = "/tmp"; + //constructor for use with a single host - public Publisher(String host, int port, String clientID, String queueName, - String user, String password, String virtualPath, String destinationDir) + public Publisher() { try { - createConnection(host, port, clientID, user, password, virtualPath); - - //create a transactional session - _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); - //now using a queue rather than a topic - //AMQTopic destination = new AMQTopic(topicName); - //Queue is non-exclusive and not deleted when last consumer detaches - _destination = _session.createQueue(queueName); - - //create a message producer - _producer = _session.createProducer(_destination); - - //set destination dir for files that have been processed - _destinationDir = destinationDir; - - _connection.start(); - } - catch (Exception e) - { - e.printStackTrace(); - _log.error(e); - } - } - - //constructor that allows for multiple host details to be provided for failover - public Publisher(String hostdetails, String clientID, String queueName, - String user, String password, String virtualPath, String destinationDir) - { - try - { - if (queueName==null||queueName.length()==0) - { - queueName = Statics.QUEUE_NAME; - } - createConnectionWithFailover(hostdetails, clientID, user, password, virtualPath); + //then create a connection using the AMQConnectionFactory + AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); + _connection = cf.createConnection(); //create a transactional session _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - //now using a queue rather than a topic - //AMQTopic destination = new AMQTopic(topicName); + //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches - _destination = _session.createQueue(queueName); + _destination = _session.createQueue((String)ctx.lookup("MyQueue")); //create a message producer _producer = _session.createProducer(_destination); //set destination dir for files that have been processed - _destinationDir = destinationDir; + _destinationDir = _defaultDestinationDir; _connection.start(); } @@ -117,9 +89,9 @@ public class Publisher } } - /* + /** * Publishes a non-persistent message using transacted session - */ + **/ public boolean sendMessage(Message message) { try @@ -186,34 +158,6 @@ public class Publisher _destinationDir = destinationDir; } - //ONly using one set of host details - private void createConnection(String host, int port, String clientID, String user, String password, String virtualPath) - throws ConnectionException - { - try - { - _connection = new AMQConnection(host, port, user, password, clientID, virtualPath); - } - catch (Exception e) - { - throw new ConnectionException(e.toString()); - } - } - - //Create connection with more than one set of host details for failover - private void createConnectionWithFailover(String hostdetails, String clientID, String user, String password, String virtualPath) - throws ConnectionException - { - try - { - _connection = new AMQConnection(hostdetails, user, password, clientID, virtualPath); - } - catch (Exception e) - { - throw new ConnectionException(e.toString()); - } - } - public String getName() { return _name; diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java index 2ac5ca7c85..3335833c2d 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java @@ -22,10 +22,6 @@ import org.apache.log4j.Logger; /** * Exception thrown by monitor when cannot send a message marked for immediate delivery - * Author: Marnie McCormack - * Date: 18-Jul-2006 - * Time: 11:13:23 - * Copyright JPMorgan Chase 2006 */ public class UndeliveredMessageException extends Exception { diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java index e32ffe6b10..8723983862 100644 --- a/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java +++ b/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java @@ -20,12 +20,6 @@ package org.apache.qpid.example.shared; import org.apache.log4j.Logger; -/** - * Author: Marnie McCormack - * Date: 18-Jul-2006 - * Time: 11:13:23 - * Copyright JPMorgan Chase 2006 - */ public class ConnectionException extends Exception { private int _errorCode; diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java b/java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java new file mode 100644 index 0000000000..787cecd541 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +public class ContextException extends Exception { + + private int _errorCode; + + public ContextException(String message) + { + super(message); + } + + public ContextException(String msg, Throwable t) + { + super(msg, t); + } + + public ContextException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public ContextException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public ContextException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public ContextException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public ContextException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java index 8b0e6e7378..54446cb6a7 100644 --- a/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java +++ b/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java @@ -22,10 +22,6 @@ import java.io.*; /** * Class that provides file related utility methods for utility use - * Author: Marnie McCormack - * Date: 20-Jul-2006 - * Time: 08:17:16 - * Copyright JPMorgan Chase 2006 */ public class FileUtils { diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java new file mode 100644 index 0000000000..b39892b688 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.io.InputStream; +import java.io.IOException; + +/** + * Class that provides helper methods for JNDI + */ +public class InitialContextHelper { + + public static final String _defaultPropertiesName = "example.properties"; + protected static Properties _fileProperties; + protected static InitialContext _initialContext; + protected static final Logger _log = Logger.getLogger(InitialContextHelper.class); + + public InitialContextHelper(String propertiesName) throws ContextException + { + try + { + if (propertiesName == null || propertiesName.length() == 0) + { + propertiesName = _defaultPropertiesName; + } + + _fileProperties = new Properties(); + ClassLoader cl = this.getClass().getClassLoader(); + + //NB: Need to change path to reflect package if moving classes around ! + InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName); + _fileProperties.load(is); + _initialContext = new InitialContext(_fileProperties); + } + catch (IOException e) + { + throw new ContextException(_log, e.toString()); + } + catch (NamingException n) + { + throw new ContextException(_log, n.toString()); + } + } + + public Properties getFileProperties() + { + return _fileProperties; + } + + public InitialContext getInitialContext() + { + return _initialContext; + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java b/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java index 7516cde29d..c056f8a7da 100644 --- a/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java +++ b/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java @@ -20,9 +20,6 @@ package org.apache.qpid.example.shared; /** * Constants used by AMS Publisher/Subscriber classes - * Author: Marnie McCormack - * Date: 18-Jul-2006 - * Time: 09:19:33 */ public class Statics { @@ -30,7 +27,7 @@ public class Statics { public static final String QUEUE_NAME = "EXAMPLE_QUEUE"; - public static final String MONITOR_QUEUE = "MONITOR_QUEUE"; + public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR"; public static final String HOST_PROPERTY = "host"; diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/example.properties b/java/client/src/test/java/org/apache/qpid/example/shared/example.properties new file mode 100644 index 0000000000..82de41908f --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/example/shared/example.properties @@ -0,0 +1,21 @@ +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java index 361dc16467..d6e020bf43 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java @@ -21,25 +21,27 @@ package org.apache.qpid.example.subscriber; import org.apache.log4j.Logger; import org.apache.qpid.example.shared.Statics; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.JMSException; -import javax.jms.Queue; +import javax.jms.*; /** * Subclass of Subscriber which consumes a heartbeat message - * Author: Marnie McCormack - * Date: 12-Sep-2006 - * Time: 09:41:07 - * Copyright JPMorgan Chase 2006 */ public class MonitoredSubscriber extends Subscriber { + protected String _monitorDestinationName; + private static final Logger _logger = Logger.getLogger(MonitoredSubscriber.class); private static MessageConsumer _monitorConsumer; + public MonitoredSubscriber() + { + super(); + //lookup queue name and append suffix + _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX; + } + public static class MonitorMessageListener implements MessageListener { private String _name; @@ -79,39 +81,32 @@ public class MonitoredSubscriber extends Subscriber /* * Subscribes to Queue and attaches additional monitor listener - * @param hostdetails - for broker connection in host1:port1;host2:port2 format - * @param username - for connection to the broker - * @password - for connection to the broker - * @virtualpath */ - public void subscribeAndMonitor(String hostdetails, String username, String password, - String virtualPath, String queueName) + public void subscribeAndMonitor() { - Queue queue; - try { - //Create monitor comsumer for failover purposes - if (queueName==null||queueName.length()==0) - { - queue = getSession(_connection).createQueue(Statics.QUEUE_NAME); - } - else - { - queue = getSession(_connection).createQueue(queueName); - } + _connection = _connectionFactory.createConnection(); + + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_monitorDestinationName); - _monitorConsumer = getSession(_connection).createConsumer(queue); + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _monitorConsumer = session.createConsumer(destination); //give the monitor message listener a name of it's own - _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener("MonitorListener " + System.currentTimeMillis())); + _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener + ("MonitorListener " + System.currentTimeMillis())); MonitoredSubscriber._logger.info("Starting monitored subscription ..."); MonitoredSubscriber._connection.start(); //and now start ordinary consumption too - subscribe(hostdetails,username,password,virtualPath,queueName); + subscribe(); } catch (Throwable t) { diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java index 3bc1c5ea08..d6ec8bd5de 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java @@ -25,10 +25,6 @@ import java.util.Properties; /** * Allows you to simply start a monitored subscriber - * Author: Marnie McCormack - * Date: 08-Aug-2006 - * Time: 12:05:52 - * Copyright JPMorgan Chase 2006 */ public class MonitoredSubscriptionWrapper { @@ -46,9 +42,7 @@ public class MonitoredSubscriptionWrapper { //note that for failover should set -Dhost=host1:port1;host2:port2 //Client will then failover in order i.e. connect to first host and failover to second and so on - _subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.QUEUE_NAME)); + _subscriber.subscribe(); } //Stop subscribing now ... diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java index 760eb2d108..6b89567b83 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -19,43 +19,64 @@ package org.apache.qpid.example.subscriber; import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; import javax.jms.*; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.InitialContext; -import org.apache.qpid.example.shared.Statics; -import org.apache.qpid.example.shared.ConnectionException; - -import java.net.InetAddress; +import org.apache.qpid.example.shared.InitialContextHelper; /** * Subscriber which consumes messages from a queue - * Author: Marnie McCormack - * Date: 12-Sep-2006 - * Time: 09:41:07 - * Copyright JPMorgan Chase 2006 */ public class Subscriber { - private static final Logger _logger = Logger.getLogger(Subscriber.class); + private static final Logger _log = Logger.getLogger(Subscriber.class); protected static Connection _connection; protected static MessageConsumer _consumer; - protected static Session _session; + protected static InitialContextHelper _contextHelper; + + protected static AMQConnectionFactory _connectionFactory; + + protected String _destinationName; + + public Subscriber() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); + //lookup queue name + _destinationName = (String) ctx.lookup("MyQueue"); + } + catch (Exception e) + { + e.printStackTrace(); + _log.error(e); + } + } /* * Listener class that handles messages */ - public static class AMSMessageListener implements MessageListener + public static class ExampleMessageListener implements MessageListener { private String _name; - public AMSMessageListener(String name) + public ExampleMessageListener(String name) { _name = name; @@ -67,76 +88,63 @@ public class Subscriber */ public void onMessage(javax.jms.Message message) { - _logger.info(_name + " got message '" + message + "'"); + _log.info(_name + " got message '" + message + "'"); try { - //@TODO handle your message appropriately for your application here ? + //NB: Handle your message appropriately for your application here + //do some stuff - _logger.debug("Acknowledging recieved message"); + _log.debug("Acknowledging recieved message"); //Now acknowledge the message to clear it from our queue message.acknowledge(); } catch(JMSException j) { - _logger.error("JMSException trying to acknowledge message receipt"); + _log.error("JMSException trying to acknowledge message receipt"); j.printStackTrace(); } catch(Exception e) { - _logger.error("Unexpected exception trying to handle message"); + _log.error("Unexpected exception trying to handle message"); e.printStackTrace(); } } } /* - * Subscribes to AMS Queue and attaches listener - * @param hostdetails - for broker connection in host1:port1;host2:port2 format - * @param username - for connection to the broker - * @password - for connection to the broker - * @virtualpath + * Subscribes to example Queue and attaches listener */ - public void subscribe(String hostdetails, String username, String password, - String virtualPath, String queue) + public void subscribe() { - Queue q; - - _logger.info("Starting subscription ..."); - + _log.info("Starting subscription ..."); try { - //To enable failover simply specify more than one host:port combination for hostdetails - //Format is host1:port1;host2:port2 - _connection = getConnectionWithFailover(hostdetails,username,password,virtualPath); + _connection = _connectionFactory.createConnection(); - //Default to a queue with a default name if queue is null - replace with your own name from config etc - if (queue==null || queue.length()==0) - { - q = getSession(_connection).createQueue(Statics.QUEUE_NAME); - } - else - { - q = getSession(_connection).createQueue(queue); - } + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_destinationName); //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _consumer = getSession(_connection).createConsumer(q); + _consumer = session.createConsumer(destination); //give the message listener a name of it's own - _consumer.setMessageListener(new AMSMessageListener("MessageListener " + System.currentTimeMillis())); + _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); _connection.start(); } catch (Throwable t) { - _logger.error("Fatal error: " + t); + _log.error("Fatal error: " + t); t.printStackTrace(); } - _logger.info("Waiting for messages ..."); + _log.info("Waiting for messages ..."); //wait for messages and sleep to survive failover try @@ -148,10 +156,15 @@ public class Subscriber } catch (Exception e) { - _logger.warn("Exception while Subscriber sleeping",e); + _log.warn("Exception while Subscriber sleeping",e); } } + public void setDestinationName(String name) + { + _destinationName = name; + } + /* * stop consuming and close connection */ @@ -166,58 +179,7 @@ public class Subscriber } catch(JMSException j) { - _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); - } - } - - /* - * Get a connection for our broker with failover by providing an array of hostdetails - * @param hostdetails - a delimited string of host1:port1;host2:port2 style connection details - * @param username - for connection to the broker - * @password - for connection to the broker - * @virtualpath - */ - protected Connection getConnectionWithFailover(String hostdetails, String username, String password, - String virtualPath) throws ConnectionException - { - if (_connection == null) - { - try - { - _connection = new AMQConnection(hostdetails,username,password,InetAddress.getLocalHost().getHostName(),virtualPath); - - //To use a url to get your connection create a string in this format and then get a connection with it - //String myurl = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',failover='roundrobin'"; - //_connection = new AMQConnectionFactory(url).createConnection(); - - return _connection; - } - catch (Exception e) - { - throw new ConnectionException(e.toString()); - } - } - else - { - return _connection; - } - } - - /* - * Creates a non-transacted session for consuming messages - * Using client acknowledge mode means messages removed from queue only once ack'd - * @param connection - to the broker - */ - protected Session getSession(Connection connection) throws JMSException - { - if (_session == null) - { - _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - return _session; - } - else - { - return _session; + _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); } } diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java index 64e31c2262..4e755e858f 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java @@ -26,10 +26,6 @@ import org.apache.log4j.BasicConfigurator; /** * Allows you to simply start a subscriber - * Author: Marnie McCormack - * Date: 08-Aug-2006 - * Time: 12:05:52 - * Copyright JPMorgan Chase 2006 */ public class SubscriptionWrapper { @@ -47,9 +43,7 @@ public class SubscriptionWrapper { //note that for failover should set -Dhost=host1:port1;host2:port2 //Client will then failover in order i.e. connect to first host and failover to second and so on - _subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY)); + _subscriber.subscribe(); } //Stop subscribing now ... diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java b/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java index 1168f8822a..3a81a0224b 100644 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java +++ b/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java @@ -32,11 +32,10 @@ import org.apache.log4j.BasicConfigurator; public class TestAMSPubSub { private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class); - private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml"; + private static final String _defaultPayloadPath = "/tmp"; private static Subscriber subscriber; - private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; /** * Test main for class using default of local file for message payload @@ -53,8 +52,8 @@ public class TestAMSPubSub { //create publisher and subscriber subscriber = new Subscriber(); - //subscribe to the topic - testPubSub.subscribe(args); + //subscribe + testPubSub.subscribe(); //publish a message if (args.length == 1) @@ -76,12 +75,9 @@ public class TestAMSPubSub { } - private void subscribe(String[] args) + private void subscribe() { - Properties props = System.getProperties(); - subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY),props.getProperty(Statics.QUEUE_PROPERTY)); + subscriber.subscribe(); } private void publish(String payloadPath) diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java b/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java index 95fd9975eb..f1a921e106 100644 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java +++ b/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java @@ -32,7 +32,7 @@ import org.apache.log4j.BasicConfigurator; public class TestMultSubscribers { private static final Logger _logger = Logger.getLogger(TestMultSubscribers.class); - private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml"; + private static final String _defaultPayloadPath = "/tmp"; private static Subscriber subscriber1; private static Subscriber subscriber2; @@ -84,12 +84,8 @@ public class TestMultSubscribers { private void subscribe(String[] args) { Properties props = System.getProperties(); - subscriber1.subscribe(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY),props.getProperty(Statics.QUEUE_PROPERTY)); - subscriber2.subscribe(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY),props.getProperty(Statics.QUEUE_PROPERTY)); + subscriber1.subscribe(); + subscriber2.subscribe(); } diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java b/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java index 9fa0116216..6ff6028ccd 100644 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java +++ b/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java @@ -29,9 +29,7 @@ import org.apache.log4j.BasicConfigurator; public class TestPublisher { private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class); - private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml"; - - private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; + private static final String _defaultPayloadPath = "/tmp"; /** * Test main for class using default of local file for message payload diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java b/java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java deleted file mode 100644 index 7f0189886c..0000000000 --- a/java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.test; - -import org.apache.qpid.example.subscriber.Subscriber; -import org.apache.qpid.example.shared.Statics; - -import java.util.Properties; - -import org.apache.log4j.Logger; -import org.apache.log4j.BasicConfigurator; - - -public class TestSubscriber { - - private static final Logger _logger = Logger.getLogger(TestSubscriber.class); - private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml"; - - private static Subscriber subscriber; - - private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - - /** - * Test main for class using default of local file for message payload - */ - public static void main(String[] args) - { - - //switch on logging - BasicConfigurator.configure(); - - TestSubscriber testSub = new TestSubscriber(); - - //create publisher and subscriber - subscriber = new Subscriber(); - - //subscribe to the topic - testSub.subscribe(args); - - //and exit as we're all done - //System.exit(0); - - } - - private void subscribe(String[] args) - { - Properties props = System.getProperties(); - subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY), - props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY), - props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY)); - } - -} - -- cgit v1.2.1