diff options
Diffstat (limited to 'java/client/example/src')
6 files changed, 450 insertions, 36 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 8784d340da..b6544db995 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -18,18 +18,18 @@ */ package org.apache.qpid.example.publisher; -import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; -import javax.jms.*; - -import java.util.Properties; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; /** - * Class that sends heartbeat messages to allow monitoring of message consumption - * Sends regular (currently 20 seconds apart) heartbeat message + * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds + * apart) heartbeat message */ -public class MonitorMessageDispatcher { +public class MonitorMessageDispatcher +{ private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); @@ -39,17 +39,18 @@ public class MonitorMessageDispatcher { /** * Easy entry point for running a message dispatcher for monitoring consumption + * * @param args */ public static void main(String[] args) { - //Switch on logging appropriately for your app BasicConfigurator.configure(); try { - while(true) + int i =0; + while (i < 1000) { try { @@ -62,9 +63,10 @@ public class MonitorMessageDispatcher { } //sleep for twenty seconds and then publish again - change if appropriate - Thread.sleep(20000); + //Thread.sleep(1000); + i++ ; } - catch(UndeliveredMessageException a) + catch (UndeliveredMessageException a) { //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); @@ -72,7 +74,7 @@ public class MonitorMessageDispatcher { } } } - catch(Exception e) + catch (Exception e) { _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); @@ -81,7 +83,7 @@ public class MonitorMessageDispatcher { { if (getMonitorPublisher() != null) { - getMonitorPublisher().cleanup(); + getMonitorPublisher().cleanup(); } } @@ -90,19 +92,24 @@ public class MonitorMessageDispatcher { /** * Publish heartbeat message + * * @throws JMSException * @throws UndeliveredMessageException */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher - getMonitorPublisher().sendImmediateMessage - (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); +// getMonitorPublisher().sendImmediateMessage +// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + + getMonitorPublisher().sendMessage + (getMonitorPublisher()._session, + FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), + DeliveryMode.PERSISTENT, false, true); + } - /** - * Cleanup publishers - */ + /** Cleanup publishers */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -119,16 +126,16 @@ public class MonitorMessageDispatcher { //Returns a _publisher for the monitor queue private static MonitorPublisher getMonitorPublisher() { - if (_monitorPublisher != null) - { - return _monitorPublisher; - } + if (_monitorPublisher != null) + { + return _monitorPublisher; + } - //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(); + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); - _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); - return _monitorPublisher; + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 233c3fea0a..a67b602e58 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -18,15 +18,17 @@ */ package org.apache.qpid.example.publisher; -import javax.jms.Message; +import org.apache.log4j.Logger; +import org.apache.qpid.client.BasicMessageProducer; + import javax.jms.DeliveryMode; import javax.jms.JMSException; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.log4j.Logger; +import javax.jms.Message; +import javax.jms.Session; /** - * Subclass of Publisher which uses QPID functionality to send a heartbeat message - * Note immediate flag not available via JMS MessageProducer + * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via + * JMS MessageProducer */ public class MonitorPublisher extends Publisher { @@ -40,14 +42,45 @@ public class MonitorPublisher extends Publisher super(); } - /* - * Publishes a non-persistent message using transacted session - */ + /* + * Publishes a message using given details + */ + public boolean sendMessage(Session session, Message message, int deliveryMode, + boolean immediate, boolean commit) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer) session.createProducer(_destination); + + _producer.send(message, deliveryMode, immediate); + + if (commit) + { + //commit the message send and close the transaction + _session.commit(); + } + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error(e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /* + * Publishes a non-persistent message using transacted session + */ public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException { try { - _producer = (BasicMessageProducer)_session.createProducer(_destination); + _producer = (BasicMessageProducer) _session.createProducer(_destination); //Send message via our producer which is not persistent and is immediate //NB: not available via jms interface MessageProducer @@ -62,7 +95,7 @@ public class MonitorPublisher extends Publisher //Have to assume our commit failed but do not rollback here as channel closed _log.error(e); e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message",e); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); } _log.info(_name + " finished sending message: " + message); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java new file mode 100644 index 0000000000..e32ee0ba73 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +/** + * An abstract base class that wraps up the creation of a JMS client utilising JNDI + */ +public abstract class Client +{ + protected ConnectionSetup _setup; + + protected Connection _connection; + protected Destination _destination; + protected Session _session; + + public Client(String destination) + { + if (destination == null) + { + destination = ConnectionSetup.TOPIC_JNDI_NAME; + } + + try + { + _setup = new ConnectionSetup(); + } + catch (NamingException e) + { + //ignore + } + + if (_setup != null) + { + try + { + _connection = _setup.getConnectionFactory().createConnection(); + _destination = _setup.getDestination(destination); + } + catch (JMSException e) + { + System.err.println(e.getMessage()); + } + } + } + + public abstract void start(); + +}
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java new file mode 100644 index 0000000000..c4edd9034f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; + +/** + * This ConnectionSetup is a wrapper around JNDI it creates a number of entries. + * + * It is equivalent to a PropertyFile of value: + * + * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost' + * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1' + * + * queue.queue=example.MyQueue + * topic.topic=example.hierarical.topic + * + */ +public class ConnectionSetup +{ + final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + final static String CONNECTION_JNDI_NAME = "local"; + final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'"; + + public static final String QUEUE_JNDI_NAME = "queue"; + final static String QUEUE_NAME = "example.MyQueue"; + + public static final String TOPIC_JNDI_NAME = "topic"; + final static String TOPIC_NAME = "example.hierarical.topic"; + + private Context _ctx; + + public ConnectionSetup() throws NamingException + { + + // Set the properties ... + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); + properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); + properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'"); + + properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME); + // Create the initial context + _ctx = new InitialContext(properties); + + } + + public ConnectionSetup(Properties properties) throws NamingException + { + _ctx = new InitialContext(properties); + } + + public ConnectionFactory getConnectionFactory() + { + + // Perform the lookups + try + { + return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME); + } + catch (NamingException e) + { + //ignore + } + return null; + } + + public Destination getDestination(String jndiName) + { + // Perform the lookups + try + { + return (Destination) _ctx.lookup(jndiName); + } + catch (ClassCastException cce) + { + //ignore + } + catch (NamingException ne) + { + //ignore + } + return null; + } + + + public void close() + { + try + { + _ctx.close(); + } + catch (NamingException e) + { + //ignore + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java new file mode 100644 index 0000000000..dd936e429f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * A simple Publisher example. + * + * The class can take two arguments. + * java Publisher <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + * + */ +public class Publisher extends Client +{ + int _msgCount; + + public Publisher(String destination, int msgCount) + { + super(destination); + _msgCount = msgCount; + } + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer _producer = _session.createProducer(_destination); + + for (int msgCount = 0; msgCount < _msgCount; msgCount++) + { + _producer.send(_session.createTextMessage("msg:" + msgCount)); + System.out.println("Sent:" + msgCount); + } + + System.out.println("Done."); + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + + public static void main(String[] args) + { + + String destination = args.length > 2 ? args[1] : null; + + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Publisher(destination, msgCount).start(); + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java new file mode 100644 index 0000000000..f2d736701f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; + + +/** + * Simple client that listens for the specified number of msgs on the given Destinaton + * + * The class can take two arguments. + * java Subscriber <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + */ +public class Subscriber extends Client implements MessageListener +{ + + CountDownLatch _count; + + public Subscriber(String destination, int msgCount) + { + super(destination); + _count = new CountDownLatch(msgCount); + } + + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME), + "exampleClient").setMessageListener(this); + _connection.start(); + _count.await(); + + System.out.println("Done"); + + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + public static void main(String[] args) + { + String destination = args.length > 2 ? args[1] : null; + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Subscriber(destination, msgCount).start(); + } + + public void onMessage(Message message) + { + try + { + _count.countDown(); + System.out.println("Received msg:" + ((TextMessage) message).getText()); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} |