summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMarnie McCormack <marnie@apache.org>2006-11-24 12:12:43 +0000
committerMarnie McCormack <marnie@apache.org>2006-11-24 12:12:43 +0000
commitb442c78351bf330cf23b67e86aa17424d5a78966 (patch)
tree7e5c130cedb3167dafa78fe28d8b57ba7a5e9750 /java
parent80b543c3ae202e53236bf6132bd4b0ff3330b7d2 (diff)
downloadqpid-python-b442c78351bf330cf23b67e86aa17424d5a78966.tar.gz
Initial example class overhaul - still some way to go.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java20
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java15
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java94
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java73
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java78
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/shared/Statics.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/shared/example.properties21
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java51
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java158
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java14
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java70
20 files changed, 293 insertions, 365 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
index f338a0b140..ca3e5ce3f5 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
@@ -30,11 +30,7 @@ import javax.jms.JMSException;
/**
* Class that sends message files to the Publisher to distribute
* using files as input
- * Must set system properties for host etc or amend and use config props
- * Author: Marnie McCormack
- * Date: 20-Jul-2006
- * Time: 09:56:56
- * Copyright JPMorgan Chase 2006
+ * Must set properties for host in properties file or uses in vm broker
*/
public class FileMessageDispatcher {
@@ -47,7 +43,7 @@ public class FileMessageDispatcher {
public static void main(String[] args)
{
- //Check command line args ok - must provide a path or file for us to run
+ //Check command line args ok - must provide a path or file for us to dispatch
if (args.length == 0)
{
System.err.println("Usage: FileMessageDispatcher <filesToDispatch>" + "");
@@ -134,8 +130,6 @@ public class FileMessageDispatcher {
/*
* Returns a _publisher for a queue
- * Using system properties to get connection info for now
- * Must set using -D the host, client, queue, user, pwd, virtual path, archive path
*/
private static Publisher getPublisher()
{
@@ -144,14 +138,8 @@ public class FileMessageDispatcher {
return _publisher;
}
- //Create _publisher using system properties
- Properties props = System.getProperties();
-
- //Create a _publisher using failover details
- _publisher = new Publisher(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.CLIENT_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.ARCHIVE_PATH));
+ //Create a _publisher
+ _publisher = new Publisher();
_publisher.setName(DEFAULT_PUB_NAME);
return _publisher;
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java
index f9dea268d2..34360d6708 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactoryException.java
@@ -20,12 +20,6 @@ package org.apache.qpid.example.publisher;
import org.apache.log4j.Logger;
-/**
- * Author: Marnie McCormack
- * Date: 18-Jul-2006
- * Time: 11:13:23
- * Copyright JPMorgan Chase 2006
- */
public class MessageFactoryException extends Exception {
private int _errorCode;
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
index e47a66cbc1..16b32da22a 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -27,10 +27,6 @@ import java.util.Properties;
/**
* Class that sends heartbeat messages to allow monitoring of message consumption
* Sends regular (currently 20 seconds apart) heartbeat message
- * Author: Marnie McCormack
- * Date: 20-Jul-2006
- * Time: 09:56:56
- * Copyright JPMorgan Chase 2006
*/
public class MonitorMessageDispatcher {
@@ -122,10 +118,7 @@ public class MonitorMessageDispatcher {
Properties props = System.getProperties();
//Create a _publisher using failover details and constant for monitor queue
- _monitorPublisher = new MonitorPublisher(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.CLIENT_PROPERTY), Statics.MONITOR_QUEUE,
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.ARCHIVE_PATH));
+ _monitorPublisher = new MonitorPublisher();
_monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
return _monitorPublisher;
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java
index 0c5e81a0d1..233c3fea0a 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorPublisher.java
@@ -27,10 +27,6 @@ import org.apache.log4j.Logger;
/**
* Subclass of Publisher which uses QPID functionality to send a heartbeat message
* Note immediate flag not available via JMS MessageProducer
- * Author: Marnie McCormack
- * Date: 12-Sep-2006
- * Time: 09:41:07
- * Copyright JPMorgan Chase 2006
*/
public class MonitorPublisher extends Publisher
{
@@ -39,16 +35,9 @@ public class MonitorPublisher extends Publisher
BasicMessageProducer _producer;
- public MonitorPublisher(String host, int port, String clientID, String queueName,
- String user, String password, String virtualPath, String destinationDir)
+ public MonitorPublisher()
{
- super(host,port,clientID,queueName,user,password,virtualPath,destinationDir);
- }
-
- public MonitorPublisher(String hostdetails, String clientID, String queueName,
- String user, String password, String virtualPath, String destinationDir)
- {
- super(hostdetails,clientID,queueName,user,password,virtualPath,destinationDir);
+ super();
}
/*
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
index 0e66599fc6..d64fd9b142 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java
@@ -20,7 +20,7 @@ package org.apache.qpid.example.publisher;
import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.Session;
@@ -30,15 +30,16 @@ import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.Connection;
+import javax.naming.InitialContext;
-
-import org.apache.qpid.example.shared.ConnectionException;
-import org.apache.qpid.example.shared.Statics;
+import org.apache.qpid.example.shared.InitialContextHelper;
public class Publisher
{
private static final Logger _log = Logger.getLogger(Publisher.class);
+ protected InitialContextHelper _contextHelper;
+
protected Connection _connection;
protected Session _session;
@@ -51,62 +52,33 @@ public class Publisher
protected Queue _destination;
+ protected static final String _defaultDestinationDir = "/tmp";
+
//constructor for use with a single host
- public Publisher(String host, int port, String clientID, String queueName,
- String user, String password, String virtualPath, String destinationDir)
+ public Publisher()
{
try
{
- createConnection(host, port, clientID, user, password, virtualPath);
-
- //create a transactional session
- _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
- //now using a queue rather than a topic
- //AMQTopic destination = new AMQTopic(topicName);
- //Queue is non-exclusive and not deleted when last consumer detaches
- _destination = _session.createQueue(queueName);
-
- //create a message producer
- _producer = _session.createProducer(_destination);
-
- //set destination dir for files that have been processed
- _destinationDir = destinationDir;
-
- _connection.start();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- _log.error(e);
- }
- }
-
- //constructor that allows for multiple host details to be provided for failover
- public Publisher(String hostdetails, String clientID, String queueName,
- String user, String password, String virtualPath, String destinationDir)
- {
- try
- {
- if (queueName==null||queueName.length()==0)
- {
- queueName = Statics.QUEUE_NAME;
- }
- createConnectionWithFailover(hostdetails, clientID, user, password, virtualPath);
+ //then create a connection using the AMQConnectionFactory
+ AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
+ _connection = cf.createConnection();
//create a transactional session
_session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- //now using a queue rather than a topic
- //AMQTopic destination = new AMQTopic(topicName);
+ //lookup the example queue and use it
//Queue is non-exclusive and not deleted when last consumer detaches
- _destination = _session.createQueue(queueName);
+ _destination = _session.createQueue((String)ctx.lookup("MyQueue"));
//create a message producer
_producer = _session.createProducer(_destination);
//set destination dir for files that have been processed
- _destinationDir = destinationDir;
+ _destinationDir = _defaultDestinationDir;
_connection.start();
}
@@ -117,9 +89,9 @@ public class Publisher
}
}
- /*
+ /**
* Publishes a non-persistent message using transacted session
- */
+ **/
public boolean sendMessage(Message message)
{
try
@@ -186,34 +158,6 @@ public class Publisher
_destinationDir = destinationDir;
}
- //ONly using one set of host details
- private void createConnection(String host, int port, String clientID, String user, String password, String virtualPath)
- throws ConnectionException
- {
- try
- {
- _connection = new AMQConnection(host, port, user, password, clientID, virtualPath);
- }
- catch (Exception e)
- {
- throw new ConnectionException(e.toString());
- }
- }
-
- //Create connection with more than one set of host details for failover
- private void createConnectionWithFailover(String hostdetails, String clientID, String user, String password, String virtualPath)
- throws ConnectionException
- {
- try
- {
- _connection = new AMQConnection(hostdetails, user, password, clientID, virtualPath);
- }
- catch (Exception e)
- {
- throw new ConnectionException(e.toString());
- }
- }
-
public String getName()
{
return _name;
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
index 2ac5ca7c85..3335833c2d 100644
--- a/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
+++ b/java/client/src/test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
@@ -22,10 +22,6 @@ import org.apache.log4j.Logger;
/**
* Exception thrown by monitor when cannot send a message marked for immediate delivery
- * Author: Marnie McCormack
- * Date: 18-Jul-2006
- * Time: 11:13:23
- * Copyright JPMorgan Chase 2006
*/
public class UndeliveredMessageException extends Exception {
diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java
index e32ffe6b10..8723983862 100644
--- a/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java
+++ b/java/client/src/test/java/org/apache/qpid/example/shared/ConnectionException.java
@@ -20,12 +20,6 @@ package org.apache.qpid.example.shared;
import org.apache.log4j.Logger;
-/**
- * Author: Marnie McCormack
- * Date: 18-Jul-2006
- * Time: 11:13:23
- * Copyright JPMorgan Chase 2006
- */
public class ConnectionException extends Exception {
private int _errorCode;
diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java b/java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java
new file mode 100644
index 0000000000..787cecd541
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/example/shared/ContextException.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.shared;
+
+import org.apache.log4j.Logger;
+
+public class ContextException extends Exception {
+
+ private int _errorCode;
+
+ public ContextException(String message)
+ {
+ super(message);
+ }
+
+ public ContextException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public ContextException(int errorCode, String msg, Throwable t)
+ {
+ super(msg + " [error code " + errorCode + ']', t);
+ _errorCode = errorCode;
+ }
+
+ public ContextException(int errorCode, String msg)
+ {
+ super(msg + " [error code " + errorCode + ']');
+ _errorCode = errorCode;
+ }
+
+ public ContextException(Logger logger, String msg, Throwable t)
+ {
+ this(msg, t);
+ logger.error(getMessage(), this);
+ }
+
+ public ContextException(Logger logger, String msg)
+ {
+ this(msg);
+ logger.error(getMessage(), this);
+ }
+
+ public ContextException(Logger logger, int errorCode, String msg)
+ {
+ this(errorCode, msg);
+ logger.error(getMessage(), this);
+ }
+
+ public int getErrorCode()
+ {
+ return _errorCode;
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java
index 8b0e6e7378..54446cb6a7 100644
--- a/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java
+++ b/java/client/src/test/java/org/apache/qpid/example/shared/FileUtils.java
@@ -22,10 +22,6 @@ import java.io.*;
/**
* Class that provides file related utility methods for utility use
- * Author: Marnie McCormack
- * Date: 20-Jul-2006
- * Time: 08:17:16
- * Copyright JPMorgan Chase 2006
*/
public class FileUtils {
diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java
new file mode 100644
index 0000000000..b39892b688
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/example/shared/InitialContextHelper.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.shared;
+
+import org.apache.log4j.Logger;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.io.InputStream;
+import java.io.IOException;
+
+/**
+ * Class that provides helper methods for JNDI
+ */
+public class InitialContextHelper {
+
+ public static final String _defaultPropertiesName = "example.properties";
+ protected static Properties _fileProperties;
+ protected static InitialContext _initialContext;
+ protected static final Logger _log = Logger.getLogger(InitialContextHelper.class);
+
+ public InitialContextHelper(String propertiesName) throws ContextException
+ {
+ try
+ {
+ if (propertiesName == null || propertiesName.length() == 0)
+ {
+ propertiesName = _defaultPropertiesName;
+ }
+
+ _fileProperties = new Properties();
+ ClassLoader cl = this.getClass().getClassLoader();
+
+ //NB: Need to change path to reflect package if moving classes around !
+ InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName);
+ _fileProperties.load(is);
+ _initialContext = new InitialContext(_fileProperties);
+ }
+ catch (IOException e)
+ {
+ throw new ContextException(_log, e.toString());
+ }
+ catch (NamingException n)
+ {
+ throw new ContextException(_log, n.toString());
+ }
+ }
+
+ public Properties getFileProperties()
+ {
+ return _fileProperties;
+ }
+
+ public InitialContext getInitialContext()
+ {
+ return _initialContext;
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java b/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java
index 7516cde29d..c056f8a7da 100644
--- a/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java
+++ b/java/client/src/test/java/org/apache/qpid/example/shared/Statics.java
@@ -20,9 +20,6 @@ package org.apache.qpid.example.shared;
/**
* Constants used by AMS Publisher/Subscriber classes
- * Author: Marnie McCormack
- * Date: 18-Jul-2006
- * Time: 09:19:33
*/
public class Statics {
@@ -30,7 +27,7 @@ public class Statics {
public static final String QUEUE_NAME = "EXAMPLE_QUEUE";
- public static final String MONITOR_QUEUE = "MONITOR_QUEUE";
+ public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR";
public static final String HOST_PROPERTY = "host";
diff --git a/java/client/src/test/java/org/apache/qpid/example/shared/example.properties b/java/client/src/test/java/org/apache/qpid/example/shared/example.properties
new file mode 100644
index 0000000000..82de41908f
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/example/shared/example.properties
@@ -0,0 +1,21 @@
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'
+
+# register some queues in JNDI using the form
+# queue.[jndiName] = [physicalName]
+queue.MyQueue = example.MyQueue
+
+# register some topics in JNDI using the form
+# topic.[jndiName] = [physicalName]
+topic.ibmStocks = stocks.nyse.ibm
+
+# Register an AMQP destination in JNDI
+# NOTE: Qpid currently only supports direct,topics and headers
+# destination.[jniName] = [BindingURL]
+destination.direct = direct://amq.direct//directQueue
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
index 361dc16467..d6e020bf43 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
@@ -21,25 +21,27 @@ package org.apache.qpid.example.subscriber;
import org.apache.log4j.Logger;
import org.apache.qpid.example.shared.Statics;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
+import javax.jms.*;
/**
* Subclass of Subscriber which consumes a heartbeat message
- * Author: Marnie McCormack
- * Date: 12-Sep-2006
- * Time: 09:41:07
- * Copyright JPMorgan Chase 2006
*/
public class MonitoredSubscriber extends Subscriber
{
+ protected String _monitorDestinationName;
+
private static final Logger _logger = Logger.getLogger(MonitoredSubscriber.class);
private static MessageConsumer _monitorConsumer;
+ public MonitoredSubscriber()
+ {
+ super();
+ //lookup queue name and append suffix
+ _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX;
+ }
+
public static class MonitorMessageListener implements MessageListener
{
private String _name;
@@ -79,39 +81,32 @@ public class MonitoredSubscriber extends Subscriber
/*
* Subscribes to Queue and attaches additional monitor listener
- * @param hostdetails - for broker connection in host1:port1;host2:port2 format
- * @param username - for connection to the broker
- * @password - for connection to the broker
- * @virtualpath
*/
- public void subscribeAndMonitor(String hostdetails, String username, String password,
- String virtualPath, String queueName)
+ public void subscribeAndMonitor()
{
- Queue queue;
-
try
{
- //Create monitor comsumer for failover purposes
- if (queueName==null||queueName.length()==0)
- {
- queue = getSession(_connection).createQueue(Statics.QUEUE_NAME);
- }
- else
- {
- queue = getSession(_connection).createQueue(queueName);
- }
+ _connection = _connectionFactory.createConnection();
+
+ //create a transactional session
+ Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ Destination destination = session.createQueue(_monitorDestinationName);
- _monitorConsumer = getSession(_connection).createConsumer(queue);
+ //Create a consumer with a destination of our queue which will use defaults for prefetch etc
+ _monitorConsumer = session.createConsumer(destination);
//give the monitor message listener a name of it's own
- _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener("MonitorListener " + System.currentTimeMillis()));
+ _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener
+ ("MonitorListener " + System.currentTimeMillis()));
MonitoredSubscriber._logger.info("Starting monitored subscription ...");
MonitoredSubscriber._connection.start();
//and now start ordinary consumption too
- subscribe(hostdetails,username,password,virtualPath,queueName);
+ subscribe();
}
catch (Throwable t)
{
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
index 3bc1c5ea08..d6ec8bd5de 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
@@ -25,10 +25,6 @@ import java.util.Properties;
/**
* Allows you to simply start a monitored subscriber
- * Author: Marnie McCormack
- * Date: 08-Aug-2006
- * Time: 12:05:52
- * Copyright JPMorgan Chase 2006
*/
public class MonitoredSubscriptionWrapper {
@@ -46,9 +42,7 @@ public class MonitoredSubscriptionWrapper {
//note that for failover should set -Dhost=host1:port1;host2:port2
//Client will then failover in order i.e. connect to first host and failover to second and so on
- _subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.QUEUE_NAME));
+ _subscriber.subscribe();
}
//Stop subscribing now ...
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
index 760eb2d108..6b89567b83 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
@@ -19,43 +19,64 @@
package org.apache.qpid.example.subscriber;
import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.InitialContext;
-import org.apache.qpid.example.shared.Statics;
-import org.apache.qpid.example.shared.ConnectionException;
-
-import java.net.InetAddress;
+import org.apache.qpid.example.shared.InitialContextHelper;
/**
* Subscriber which consumes messages from a queue
- * Author: Marnie McCormack
- * Date: 12-Sep-2006
- * Time: 09:41:07
- * Copyright JPMorgan Chase 2006
*/
public class Subscriber
{
- private static final Logger _logger = Logger.getLogger(Subscriber.class);
+ private static final Logger _log = Logger.getLogger(Subscriber.class);
protected static Connection _connection;
protected static MessageConsumer _consumer;
- protected static Session _session;
+ protected static InitialContextHelper _contextHelper;
+
+ protected static AMQConnectionFactory _connectionFactory;
+
+ protected String _destinationName;
+
+ public Subscriber()
+ {
+ try
+ {
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //then create a connection using the AMQConnectionFactory
+ _connectionFactory = (AMQConnectionFactory) ctx.lookup("local");
+ //lookup queue name
+ _destinationName = (String) ctx.lookup("MyQueue");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _log.error(e);
+ }
+ }
/*
* Listener class that handles messages
*/
- public static class AMSMessageListener implements MessageListener
+ public static class ExampleMessageListener implements MessageListener
{
private String _name;
- public AMSMessageListener(String name)
+ public ExampleMessageListener(String name)
{
_name = name;
@@ -67,76 +88,63 @@ public class Subscriber
*/
public void onMessage(javax.jms.Message message)
{
- _logger.info(_name + " got message '" + message + "'");
+ _log.info(_name + " got message '" + message + "'");
try
{
- //@TODO handle your message appropriately for your application here ?
+ //NB: Handle your message appropriately for your application here
+ //do some stuff
- _logger.debug("Acknowledging recieved message");
+ _log.debug("Acknowledging recieved message");
//Now acknowledge the message to clear it from our queue
message.acknowledge();
}
catch(JMSException j)
{
- _logger.error("JMSException trying to acknowledge message receipt");
+ _log.error("JMSException trying to acknowledge message receipt");
j.printStackTrace();
}
catch(Exception e)
{
- _logger.error("Unexpected exception trying to handle message");
+ _log.error("Unexpected exception trying to handle message");
e.printStackTrace();
}
}
}
/*
- * Subscribes to AMS Queue and attaches listener
- * @param hostdetails - for broker connection in host1:port1;host2:port2 format
- * @param username - for connection to the broker
- * @password - for connection to the broker
- * @virtualpath
+ * Subscribes to example Queue and attaches listener
*/
- public void subscribe(String hostdetails, String username, String password,
- String virtualPath, String queue)
+ public void subscribe()
{
- Queue q;
-
- _logger.info("Starting subscription ...");
-
+ _log.info("Starting subscription ...");
try
{
- //To enable failover simply specify more than one host:port combination for hostdetails
- //Format is host1:port1;host2:port2
- _connection = getConnectionWithFailover(hostdetails,username,password,virtualPath);
+ _connection = _connectionFactory.createConnection();
- //Default to a queue with a default name if queue is null - replace with your own name from config etc
- if (queue==null || queue.length()==0)
- {
- q = getSession(_connection).createQueue(Statics.QUEUE_NAME);
- }
- else
- {
- q = getSession(_connection).createQueue(queue);
- }
+ //create a transactional session
+ Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ Destination destination = session.createQueue(_destinationName);
//Create a consumer with a destination of our queue which will use defaults for prefetch etc
- _consumer = getSession(_connection).createConsumer(q);
+ _consumer = session.createConsumer(destination);
//give the message listener a name of it's own
- _consumer.setMessageListener(new AMSMessageListener("MessageListener " + System.currentTimeMillis()));
+ _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis()));
_connection.start();
}
catch (Throwable t)
{
- _logger.error("Fatal error: " + t);
+ _log.error("Fatal error: " + t);
t.printStackTrace();
}
- _logger.info("Waiting for messages ...");
+ _log.info("Waiting for messages ...");
//wait for messages and sleep to survive failover
try
@@ -148,10 +156,15 @@ public class Subscriber
}
catch (Exception e)
{
- _logger.warn("Exception while Subscriber sleeping",e);
+ _log.warn("Exception while Subscriber sleeping",e);
}
}
+ public void setDestinationName(String name)
+ {
+ _destinationName = name;
+ }
+
/*
* stop consuming and close connection
*/
@@ -166,58 +179,7 @@ public class Subscriber
}
catch(JMSException j)
{
- _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
- }
- }
-
- /*
- * Get a connection for our broker with failover by providing an array of hostdetails
- * @param hostdetails - a delimited string of host1:port1;host2:port2 style connection details
- * @param username - for connection to the broker
- * @password - for connection to the broker
- * @virtualpath
- */
- protected Connection getConnectionWithFailover(String hostdetails, String username, String password,
- String virtualPath) throws ConnectionException
- {
- if (_connection == null)
- {
- try
- {
- _connection = new AMQConnection(hostdetails,username,password,InetAddress.getLocalHost().getHostName(),virtualPath);
-
- //To use a url to get your connection create a string in this format and then get a connection with it
- //String myurl = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',failover='roundrobin'";
- //_connection = new AMQConnectionFactory(url).createConnection();
-
- return _connection;
- }
- catch (Exception e)
- {
- throw new ConnectionException(e.toString());
- }
- }
- else
- {
- return _connection;
- }
- }
-
- /*
- * Creates a non-transacted session for consuming messages
- * Using client acknowledge mode means messages removed from queue only once ack'd
- * @param connection - to the broker
- */
- protected Session getSession(Connection connection) throws JMSException
- {
- if (_session == null)
- {
- _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- return _session;
- }
- else
- {
- return _session;
+ _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
index 64e31c2262..4e755e858f 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
@@ -26,10 +26,6 @@ import org.apache.log4j.BasicConfigurator;
/**
* Allows you to simply start a subscriber
- * Author: Marnie McCormack
- * Date: 08-Aug-2006
- * Time: 12:05:52
- * Copyright JPMorgan Chase 2006
*/
public class SubscriptionWrapper {
@@ -47,9 +43,7 @@ public class SubscriptionWrapper {
//note that for failover should set -Dhost=host1:port1;host2:port2
//Client will then failover in order i.e. connect to first host and failover to second and so on
- _subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY));
+ _subscriber.subscribe();
}
//Stop subscribing now ...
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java b/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java
index 1168f8822a..3a81a0224b 100644
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java
+++ b/java/client/src/test/java/org/apache/qpid/example/test/TestAMSPubSub.java
@@ -32,11 +32,10 @@ import org.apache.log4j.BasicConfigurator;
public class TestAMSPubSub {
private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class);
- private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml";
+ private static final String _defaultPayloadPath = "/tmp";
private static Subscriber subscriber;
- private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
/**
* Test main for class using default of local file for message payload
@@ -53,8 +52,8 @@ public class TestAMSPubSub {
//create publisher and subscriber
subscriber = new Subscriber();
- //subscribe to the topic
- testPubSub.subscribe(args);
+ //subscribe
+ testPubSub.subscribe();
//publish a message
if (args.length == 1)
@@ -76,12 +75,9 @@ public class TestAMSPubSub {
}
- private void subscribe(String[] args)
+ private void subscribe()
{
- Properties props = System.getProperties();
- subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY),props.getProperty(Statics.QUEUE_PROPERTY));
+ subscriber.subscribe();
}
private void publish(String payloadPath)
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java b/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java
index 95fd9975eb..f1a921e106 100644
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java
+++ b/java/client/src/test/java/org/apache/qpid/example/test/TestMultSubscribers.java
@@ -32,7 +32,7 @@ import org.apache.log4j.BasicConfigurator;
public class TestMultSubscribers {
private static final Logger _logger = Logger.getLogger(TestMultSubscribers.class);
- private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml";
+ private static final String _defaultPayloadPath = "/tmp";
private static Subscriber subscriber1;
private static Subscriber subscriber2;
@@ -84,12 +84,8 @@ public class TestMultSubscribers {
private void subscribe(String[] args)
{
Properties props = System.getProperties();
- subscriber1.subscribe(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY),props.getProperty(Statics.QUEUE_PROPERTY));
- subscriber2.subscribe(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY),props.getProperty(Statics.QUEUE_PROPERTY));
+ subscriber1.subscribe();
+ subscriber2.subscribe();
}
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java b/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java
index 9fa0116216..6ff6028ccd 100644
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java
+++ b/java/client/src/test/java/org/apache/qpid/example/test/TestPublisher.java
@@ -29,9 +29,7 @@ import org.apache.log4j.BasicConfigurator;
public class TestPublisher {
private static final Logger _logger = Logger.getLogger(TestAMSPubSub.class);
- private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml";
-
- private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ private static final String _defaultPayloadPath = "/tmp";
/**
* Test main for class using default of local file for message payload
diff --git a/java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java b/java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java
deleted file mode 100644
index 7f0189886c..0000000000
--- a/java/client/src/test/java/org/apache/qpid/example/test/TestSubscriber.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.example.test;
-
-import org.apache.qpid.example.subscriber.Subscriber;
-import org.apache.qpid.example.shared.Statics;
-
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.BasicConfigurator;
-
-
-public class TestSubscriber {
-
- private static final Logger _logger = Logger.getLogger(TestSubscriber.class);
- private static final String _defaultPayloadPath = "C:/Requirements/examplexml/test.xml";
-
- private static Subscriber subscriber;
-
- private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
-
- /**
- * Test main for class using default of local file for message payload
- */
- public static void main(String[] args)
- {
-
- //switch on logging
- BasicConfigurator.configure();
-
- TestSubscriber testSub = new TestSubscriber();
-
- //create publisher and subscriber
- subscriber = new Subscriber();
-
- //subscribe to the topic
- testSub.subscribe(args);
-
- //and exit as we're all done
- //System.exit(0);
-
- }
-
- private void subscribe(String[] args)
- {
- Properties props = System.getProperties();
- subscriber.subscribe(props.getProperty(Statics.HOST_PROPERTY),
- props.getProperty(Statics.USER_PROPERTY), props.getProperty(Statics.PWD_PROPERTY),
- props.getProperty(Statics.VIRTUAL_PATH_PROPERTY), props.getProperty(Statics.QUEUE_PROPERTY));
- }
-
-}
-