diff options
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java | 43 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java (renamed from java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java) | 36 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java | 33 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java | 27 |
4 files changed, 91 insertions, 48 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index ca3e5ce3f5..b199d41432 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -19,7 +19,7 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; -import java.util.Properties; + import java.io.File; import org.apache.qpid.example.shared.FileUtils; @@ -34,12 +34,17 @@ import javax.jms.JMSException; */ public class FileMessageDispatcher { - private static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - - private static Publisher _publisher = null; + protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - private static final String DEFAULT_PUB_NAME = "Publisher"; + protected static Publisher _publisher = null; + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ public static void main(String[] args) { @@ -52,7 +57,7 @@ public class FileMessageDispatcher { { try { - //publish message(s) from file(s) and send message to monitor queue + //publish message(s) from file(s) to configured queue publish(args[0]); //Move payload file(s) to archive location as no error @@ -60,7 +65,8 @@ public class FileMessageDispatcher { } catch(Exception e) { - System.err.println("Error trying to dispatch message: " + e); + //log error and exit + _logger.error("Error trying to dispatch message: " + e); System.exit(1); } finally @@ -81,8 +87,12 @@ public class FileMessageDispatcher { System.exit(0); } - - //Publish files or file as message + /** + * Publish the content of a file or files from a directory as messages + * @param path - from main args + * @throws JMSException + * @throws MessageFactoryException - if cannot create message from file content + */ public static void publish(String path) throws JMSException, MessageFactoryException { File tempFile = new File(path); @@ -100,7 +110,7 @@ public class FileMessageDispatcher { for (File file : files) { //Create message factory passing in payload path - MessageFactory factory = new MessageFactory(getPublisher().getSession(), file.toString()); + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); //Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); @@ -110,16 +120,18 @@ public class FileMessageDispatcher { } else { - //handle as single file + //handle a single file //Create message factory passing in payload path - MessageFactory factory = new MessageFactory(getPublisher().getSession(),tempFile.toString()); + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); //Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } } - //cleanup publishers + /** + * Cleanup before exit + */ public static void cleanup() { if (getPublisher() != null) @@ -128,8 +140,8 @@ public class FileMessageDispatcher { } } - /* - * Returns a _publisher for a queue + /** + * @return A Publisher instance */ private static Publisher getPublisher() { @@ -141,7 +153,6 @@ public class FileMessageDispatcher { //Create a _publisher _publisher = new Publisher(); - _publisher.setName(DEFAULT_PUB_NAME); return _publisher; } diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java index f9944284c8..88bcbbbccb 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MessageFactory.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -25,13 +25,19 @@ import org.apache.qpid.example.shared.Statics; import java.io.*; import javax.jms.*; -public class MessageFactory +public class FileMessageFactory { - private final Session _session; - private final String _payload; - private final String _filename; + protected final Session _session; + protected final String _payload; + protected final String _filename; - public MessageFactory(Session session, String filename) throws MessageFactoryException + /** + * Contructs and instance using a filename from which content will be used to create message + * @param session + * @param filename + * @throws MessageFactoryException + */ + public FileMessageFactory(Session session, String filename) throws MessageFactoryException { try { @@ -45,9 +51,13 @@ public class MessageFactory } } - /* - * Creates message and sets filename property on it - */ + /** + * Creates a text message and sets filename property on it + * The filename property is purely intended to provide visibility + * of file content passing trhough the broker using example classes + * @return Message - a TextMessage with content from file + * @throws JMSException + */ public Message createEventMessage() throws JMSException { TextMessage msg = _session.createTextMessage(); @@ -56,9 +66,13 @@ public class MessageFactory return msg; } - /* - * Creates message from a string for use by the monitor - */ + /** + * Creates message from a string for use by the monitor + * @param session + * @param textMsg - message content + * @return Message - TextMessage with content from String + * @throws JMSException + */ public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException { TextMessage msg = session.createTextMessage(); diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 16b32da22a..8784d340da 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -20,8 +20,9 @@ package org.apache.qpid.example.publisher; import org.apache.log4j.Logger; import org.apache.log4j.BasicConfigurator; -import org.apache.qpid.example.shared.Statics; + import javax.jms.*; + import java.util.Properties; /** @@ -32,14 +33,18 @@ public class MonitorMessageDispatcher { private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); - private static MonitorPublisher _monitorPublisher = null; + protected static MonitorPublisher _monitorPublisher = null; - private static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + /** + * Easy entry point for running a message dispatcher for monitoring consumption + * @param args + */ public static void main(String[] args) { - //@TODO switch on logging appropriately at your app level + //Switch on logging appropriately for your app BasicConfigurator.configure(); try @@ -61,7 +66,7 @@ public class MonitorMessageDispatcher { } catch(UndeliveredMessageException a) { - //@TODO trigger application specific failure handling here + //trigger application specific failure handling here _logger.error("Problem delivering monitor message"); break; } @@ -69,8 +74,7 @@ public class MonitorMessageDispatcher { } catch(Exception e) { - - System.err.println("Error trying to dispatch AMS monitor message: " + e); + _logger.error("Error trying to dispatch AMS monitor message: " + e); System.exit(1); } finally @@ -84,15 +88,21 @@ public class MonitorMessageDispatcher { System.exit(1); } - //Publish heartbeat message + /** + * Publish heartbeat message + * @throws JMSException + * @throws UndeliveredMessageException + */ public static void publish() throws JMSException, UndeliveredMessageException { //Send the message generated from the payload using the _publisher getMonitorPublisher().sendImmediateMessage - (MessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); } - //cleanup publishers + /** + * Cleanup publishers + */ public static void cleanup() { if (getMonitorPublisher() != null) @@ -114,9 +124,6 @@ public class MonitorMessageDispatcher { return _monitorPublisher; } - //Create _publisher using system properties - Properties props = System.getProperties(); - //Create a _publisher using failover details and constant for monitor queue _monitorPublisher = new MonitorPublisher(); diff --git a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java index d64fd9b142..be42e0e413 100644 --- a/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java +++ b/java/client/src/test/java/org/apache/qpid/example/publisher/Publisher.java @@ -22,14 +22,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.jms.Session; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.DeliveryMode; import javax.jms.Queue; import javax.jms.MessageProducer; import javax.jms.Connection; +import javax.jms.Session; + import javax.naming.InitialContext; import org.apache.qpid.example.shared.InitialContextHelper; @@ -44,7 +44,7 @@ public class Publisher protected Session _session; - private MessageProducer _producer; + protected MessageProducer _producer; protected String _destinationDir; @@ -54,7 +54,10 @@ public class Publisher protected static final String _defaultDestinationDir = "/tmp"; - //constructor for use with a single host + /** + * Creates a Publisher instance using properties from example.properties + * See InitialContextHelper for details of how context etc created + */ public Publisher() { try @@ -68,7 +71,7 @@ public class Publisher _connection = cf.createConnection(); //create a transactional session - _session = (Session) _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches @@ -90,8 +93,9 @@ public class Publisher } /** - * Publishes a non-persistent message using transacted session - **/ + * Publishes a non-persistent message using transacted session + * Note that persistent is the default mode for send - so need to specify for transient + */ public boolean sendMessage(Message message) { try @@ -124,6 +128,9 @@ public class Publisher return true; } + /** + * Cleanup resources before exit + */ public void cleanup() { try @@ -138,11 +145,15 @@ public class Publisher } catch(Exception e) { - System.err.println("Error trying to cleanup publisher " + e); + _log.error("Error trying to cleanup publisher " + e); System.exit(1); } } + /** + * Exposes session + * @return Session + */ public Session getSession() { return _session; |