summaryrefslogtreecommitdiff
path: root/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java')
-rw-r--r--java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java176
1 files changed, 78 insertions, 98 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
index 710192f291..6b89567b83 100644
--- a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
+++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java
@@ -1,43 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.qpid.example.subscriber;
import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.InitialContext;
-import org.apache.qpid.example.shared.Statics;
-import org.apache.qpid.example.shared.ConnectionException;
-
-import java.net.InetAddress;
+import org.apache.qpid.example.shared.InitialContextHelper;
/**
* Subscriber which consumes messages from a queue
- * Author: Marnie McCormack
- * Date: 12-Sep-2006
- * Time: 09:41:07
- * Copyright JPMorgan Chase 2006
*/
public class Subscriber
{
- private static final Logger _logger = Logger.getLogger(Subscriber.class);
+ private static final Logger _log = Logger.getLogger(Subscriber.class);
protected static Connection _connection;
protected static MessageConsumer _consumer;
- protected static Session _session;
+ protected static InitialContextHelper _contextHelper;
+
+ protected static AMQConnectionFactory _connectionFactory;
+
+ protected String _destinationName;
+ public Subscriber()
+ {
+ try
+ {
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+ //then create a connection using the AMQConnectionFactory
+ _connectionFactory = (AMQConnectionFactory) ctx.lookup("local");
+
+ //lookup queue name
+ _destinationName = (String) ctx.lookup("MyQueue");
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _log.error(e);
+ }
+ }
/*
* Listener class that handles messages
*/
- public static class AMSMessageListener implements MessageListener
+ public static class ExampleMessageListener implements MessageListener
{
private String _name;
- public AMSMessageListener(String name)
+ public ExampleMessageListener(String name)
{
_name = name;
@@ -49,76 +88,63 @@ public class Subscriber
*/
public void onMessage(javax.jms.Message message)
{
- _logger.info(_name + " got message '" + message + "'");
+ _log.info(_name + " got message '" + message + "'");
try
{
- //@TODO handle your message appropriately for your application here ?
+ //NB: Handle your message appropriately for your application here
+ //do some stuff
- _logger.debug("Acknowledging recieved message");
+ _log.debug("Acknowledging recieved message");
//Now acknowledge the message to clear it from our queue
message.acknowledge();
}
catch(JMSException j)
{
- _logger.error("JMSException trying to acknowledge message receipt");
+ _log.error("JMSException trying to acknowledge message receipt");
j.printStackTrace();
}
catch(Exception e)
{
- _logger.error("Unexpected exception trying to handle message");
+ _log.error("Unexpected exception trying to handle message");
e.printStackTrace();
}
}
}
/*
- * Subscribes to AMS Queue and attaches listener
- * @param hostdetails - for broker connection in host1:port1;host2:port2 format
- * @param username - for connection to the broker
- * @password - for connection to the broker
- * @virtualpath
+ * Subscribes to example Queue and attaches listener
*/
- public void subscribe(String hostdetails, String username, String password,
- String virtualPath, String queue)
+ public void subscribe()
{
- Queue q;
-
- _logger.info("Starting subscription ...");
-
+ _log.info("Starting subscription ...");
try
{
- //To enable failover simply specify more than one host:port combination for hostdetails
- //Format is host1:port1;host2:port2
- _connection = getConnectionWithFailover(hostdetails,username,password,virtualPath);
+ _connection = _connectionFactory.createConnection();
- //Default to a queue with a default name if queue is null - replace with your own name from config etc
- if (queue==null || queue.length()==0)
- {
- q = getSession(_connection).createQueue(Statics.QUEUE_NAME);
- }
- else
- {
- q = getSession(_connection).createQueue(queue);
- }
+ //create a transactional session
+ Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ Destination destination = session.createQueue(_destinationName);
//Create a consumer with a destination of our queue which will use defaults for prefetch etc
- _consumer = getSession(_connection).createConsumer(q);
+ _consumer = session.createConsumer(destination);
//give the message listener a name of it's own
- _consumer.setMessageListener(new AMSMessageListener("MessageListener " + System.currentTimeMillis()));
+ _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis()));
_connection.start();
}
catch (Throwable t)
{
- _logger.error("Fatal error: " + t);
+ _log.error("Fatal error: " + t);
t.printStackTrace();
}
- _logger.info("Waiting for messages ...");
+ _log.info("Waiting for messages ...");
//wait for messages and sleep to survive failover
try
@@ -130,10 +156,15 @@ public class Subscriber
}
catch (Exception e)
{
- _logger.warn("Exception while Subscriber sleeping",e);
+ _log.warn("Exception while Subscriber sleeping",e);
}
}
+ public void setDestinationName(String name)
+ {
+ _destinationName = name;
+ }
+
/*
* stop consuming and close connection
*/
@@ -148,58 +179,7 @@ public class Subscriber
}
catch(JMSException j)
{
- _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
- }
- }
-
- /*
- * Get a connection for our broker with failover by providing an array of hostdetails
- * @param hostdetails - a delimited string of host1:port1;host2:port2 style connection details
- * @param username - for connection to the broker
- * @password - for connection to the broker
- * @virtualpath
- */
- protected Connection getConnectionWithFailover(String hostdetails, String username, String password,
- String virtualPath) throws ConnectionException
- {
- if (_connection == null)
- {
- try
- {
- _connection = new AMQConnection(hostdetails,username,password,InetAddress.getLocalHost().getHostName(),virtualPath);
-
- //To use a url to get your connection create a string in this format and then get a connection with it
- //String myurl = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',failover='roundrobin'";
- //_connection = new AMQConnectionFactory(url).createConnection();
-
- return _connection;
- }
- catch (Exception e)
- {
- throw new ConnectionException(e.toString());
- }
- }
- else
- {
- return _connection;
- }
- }
-
- /*
- * Creates a non-transacted session for consuming messages
- * Using client acknowledge mode means messages removed from queue only once ack'd
- * @param connection - to the broker
- */
- protected Session getSession(Connection connection) throws JMSException
- {
- if (_session == null)
- {
- _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- return _session;
- }
- else
- {
- return _session;
+ _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
}
}