diff options
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java')
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java | 176 |
1 files changed, 78 insertions, 98 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java index 710192f291..6b89567b83 100644 --- a/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java +++ b/java/client/src/test/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -1,43 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.qpid.example.subscriber; import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; import javax.jms.*; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.InitialContext; -import org.apache.qpid.example.shared.Statics; -import org.apache.qpid.example.shared.ConnectionException; - -import java.net.InetAddress; +import org.apache.qpid.example.shared.InitialContextHelper; /** * Subscriber which consumes messages from a queue - * Author: Marnie McCormack - * Date: 12-Sep-2006 - * Time: 09:41:07 - * Copyright JPMorgan Chase 2006 */ public class Subscriber { - private static final Logger _logger = Logger.getLogger(Subscriber.class); + private static final Logger _log = Logger.getLogger(Subscriber.class); protected static Connection _connection; protected static MessageConsumer _consumer; - protected static Session _session; + protected static InitialContextHelper _contextHelper; + + protected static AMQConnectionFactory _connectionFactory; + + protected String _destinationName; + public Subscriber() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + //then create a connection using the AMQConnectionFactory + _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); + + //lookup queue name + _destinationName = (String) ctx.lookup("MyQueue"); + + } + catch (Exception e) + { + e.printStackTrace(); + _log.error(e); + } + } /* * Listener class that handles messages */ - public static class AMSMessageListener implements MessageListener + public static class ExampleMessageListener implements MessageListener { private String _name; - public AMSMessageListener(String name) + public ExampleMessageListener(String name) { _name = name; @@ -49,76 +88,63 @@ public class Subscriber */ public void onMessage(javax.jms.Message message) { - _logger.info(_name + " got message '" + message + "'"); + _log.info(_name + " got message '" + message + "'"); try { - //@TODO handle your message appropriately for your application here ? + //NB: Handle your message appropriately for your application here + //do some stuff - _logger.debug("Acknowledging recieved message"); + _log.debug("Acknowledging recieved message"); //Now acknowledge the message to clear it from our queue message.acknowledge(); } catch(JMSException j) { - _logger.error("JMSException trying to acknowledge message receipt"); + _log.error("JMSException trying to acknowledge message receipt"); j.printStackTrace(); } catch(Exception e) { - _logger.error("Unexpected exception trying to handle message"); + _log.error("Unexpected exception trying to handle message"); e.printStackTrace(); } } } /* - * Subscribes to AMS Queue and attaches listener - * @param hostdetails - for broker connection in host1:port1;host2:port2 format - * @param username - for connection to the broker - * @password - for connection to the broker - * @virtualpath + * Subscribes to example Queue and attaches listener */ - public void subscribe(String hostdetails, String username, String password, - String virtualPath, String queue) + public void subscribe() { - Queue q; - - _logger.info("Starting subscription ..."); - + _log.info("Starting subscription ..."); try { - //To enable failover simply specify more than one host:port combination for hostdetails - //Format is host1:port1;host2:port2 - _connection = getConnectionWithFailover(hostdetails,username,password,virtualPath); + _connection = _connectionFactory.createConnection(); - //Default to a queue with a default name if queue is null - replace with your own name from config etc - if (queue==null || queue.length()==0) - { - q = getSession(_connection).createQueue(Statics.QUEUE_NAME); - } - else - { - q = getSession(_connection).createQueue(queue); - } + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_destinationName); //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _consumer = getSession(_connection).createConsumer(q); + _consumer = session.createConsumer(destination); //give the message listener a name of it's own - _consumer.setMessageListener(new AMSMessageListener("MessageListener " + System.currentTimeMillis())); + _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); _connection.start(); } catch (Throwable t) { - _logger.error("Fatal error: " + t); + _log.error("Fatal error: " + t); t.printStackTrace(); } - _logger.info("Waiting for messages ..."); + _log.info("Waiting for messages ..."); //wait for messages and sleep to survive failover try @@ -130,10 +156,15 @@ public class Subscriber } catch (Exception e) { - _logger.warn("Exception while Subscriber sleeping",e); + _log.warn("Exception while Subscriber sleeping",e); } } + public void setDestinationName(String name) + { + _destinationName = name; + } + /* * stop consuming and close connection */ @@ -148,58 +179,7 @@ public class Subscriber } catch(JMSException j) { - _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); - } - } - - /* - * Get a connection for our broker with failover by providing an array of hostdetails - * @param hostdetails - a delimited string of host1:port1;host2:port2 style connection details - * @param username - for connection to the broker - * @password - for connection to the broker - * @virtualpath - */ - protected Connection getConnectionWithFailover(String hostdetails, String username, String password, - String virtualPath) throws ConnectionException - { - if (_connection == null) - { - try - { - _connection = new AMQConnection(hostdetails,username,password,InetAddress.getLocalHost().getHostName(),virtualPath); - - //To use a url to get your connection create a string in this format and then get a connection with it - //String myurl = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',failover='roundrobin'"; - //_connection = new AMQConnectionFactory(url).createConnection(); - - return _connection; - } - catch (Exception e) - { - throw new ConnectionException(e.toString()); - } - } - else - { - return _connection; - } - } - - /* - * Creates a non-transacted session for consuming messages - * Using client acknowledge mode means messages removed from queue only once ack'd - * @param connection - to the broker - */ - protected Session getSession(Connection connection) throws JMSException - { - if (_session == null) - { - _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - return _session; - } - else - { - return _session; + _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); } } |