summaryrefslogtreecommitdiff
path: root/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher')
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java163
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java138
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java29
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java141
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java105
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java141
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java208
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java59
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java32
9 files changed, 1016 insertions, 0 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
new file mode 100644
index 0000000000..1849f733e9
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+import java.io.File;
+
+import javax.jms.JMSException;
+
+
+import org.apache.qpid.example.shared.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Class that sends message files to the Publisher to distribute
+ * using files as input
+ * Must set properties for host in properties file or uses in vm broker
+ */
+public class FileMessageDispatcher
+{
+
+ protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class);
+
+ protected static Publisher _publisher = null;
+
+ /**
+ * To use this main method you need to specify a path or file to use for input
+ * This class then uses file contents from the dir/file specified to generate
+ * messages to publish
+ * Intended to be a very simple way to get going with publishing using the broker
+ * @param args - must specify one value, the path to file(s) for publisher
+ */
+ public static void main(String[] args)
+ {
+
+ // Check command line args ok - must provide a path or file for us to dispatch
+ if (args.length == 0)
+ {
+ System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + "");
+ }
+ else
+ {
+ try
+ {
+ // publish message(s) from file(s) to configured queue
+ publish(args[0]);
+
+ // Move payload file(s) to archive location as no error
+ FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH));
+ }
+ catch (Exception e)
+ {
+ // log error and exit
+ _logger.error("Error trying to dispatch message: " + e);
+ System.exit(1);
+ }
+ finally
+ {
+ // clean up before exiting
+ if (getPublisher() != null)
+ {
+ getPublisher().cleanup();
+ }
+ }
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Finished dispatching message");
+ }
+
+ System.exit(0);
+ }
+
+ /**
+ * Publish the content of a file or files from a directory as messages
+ * @param path - from main args
+ * @throws JMSException
+ * @throws MessageFactoryException - if cannot create message from file content
+ */
+ public static void publish(String path) throws JMSException, MessageFactoryException
+ {
+ File tempFile = new File(path);
+ if (tempFile.isDirectory())
+ {
+ // while more files in dir publish them
+ File[] files = tempFile.listFiles();
+
+ if ((files == null) || (files.length == 0))
+ {
+ _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile);
+ }
+ else
+ {
+ for (File file : files)
+ {
+ // Create message factory passing in payload path
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString());
+
+ // Send the message generated from the payload using the _publisher
+ getPublisher().sendMessage(factory.createEventMessage());
+
+ }
+ }
+ }
+ else
+ {
+ // handle a single file
+ // Create message factory passing in payload path
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString());
+
+ // Send the message generated from the payload using the _publisher
+ getPublisher().sendMessage(factory.createEventMessage());
+ }
+ }
+
+ /**
+ * Cleanup before exit
+ */
+ public static void cleanup()
+ {
+ if (getPublisher() != null)
+ {
+ getPublisher().cleanup();
+ }
+ }
+
+ /**
+ * @return A Publisher instance
+ */
+ private static Publisher getPublisher()
+ {
+ if (_publisher != null)
+ {
+ return _publisher;
+ }
+
+ // Create a _publisher
+ _publisher = new Publisher();
+
+ return _publisher;
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
new file mode 100644
index 0000000000..04339b2498
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+
+import java.io.*;
+import javax.jms.*;
+
+public class FileMessageFactory
+{
+ protected final Session _session;
+ protected final String _payload;
+ protected final String _filename;
+
+ /**
+ * Contructs and instance using a filename from which content will be used to create message
+ * @param session
+ * @param filename
+ * @throws MessageFactoryException
+ */
+ public FileMessageFactory(Session session, String filename) throws MessageFactoryException
+ {
+ try
+ {
+ _filename = filename;
+ _payload = FileUtils.readFileAsString(filename);
+ _session = session;
+ }
+ catch (Exception e)
+ {
+ MessageFactoryException mfe = new MessageFactoryException(e.toString(), e);
+ throw mfe;
+ }
+ }
+
+ /**
+ * Creates a text message and sets filename property on it
+ * The filename property is purely intended to provide visibility
+ * of file content passing trhough the broker using example classes
+ * @return Message - a TextMessage with content from file
+ * @throws JMSException
+ */
+ public Message createEventMessage() throws JMSException
+ {
+ TextMessage msg = _session.createTextMessage();
+ msg.setText(_payload);
+ msg.setStringProperty(Statics.FILENAME_PROPERTY, new File(_filename).getName());
+
+ return msg;
+ }
+
+ /**
+ * Creates message from a string for use by the monitor
+ * @param session
+ * @param textMsg - message content
+ * @return Message - TextMessage with content from String
+ * @throws JMSException
+ */
+ public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException
+ {
+ TextMessage msg = session.createTextMessage();
+ msg.setText(textMsg);
+
+ return msg;
+ }
+
+ public Message createShutdownMessage() throws JMSException
+ {
+ return _session.createTextMessage("SHUTDOWN");
+ }
+
+ public Message createReportRequestMessage() throws JMSException
+ {
+ return _session.createTextMessage("REPORT");
+ }
+
+ public Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ public boolean isShutdown(Message m)
+ {
+ return checkText(m, "SHUTDOWN");
+ }
+
+ public boolean isReport(Message m)
+ {
+ return checkText(m, "REPORT");
+ }
+
+ public Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return e.toString();
+ }
+ }
+
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return false;
+ }
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
new file mode 100644
index 0000000000..d709da6432
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+public class MessageFactoryException extends Exception
+{
+ public MessageFactoryException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
new file mode 100644
index 0000000000..3d16e01af4
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+
+/**
+ * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds
+ * apart) heartbeat message
+ */
+public class MonitorMessageDispatcher
+{
+
+ private static final Logger _logger = LoggerFactory.getLogger(MonitorMessageDispatcher.class);
+
+ protected static MonitorPublisher _monitorPublisher = null;
+
+ protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher";
+
+ /**
+ * Easy entry point for running a message dispatcher for monitoring consumption
+ * Sends 1000 messages with no delay
+ *
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ //Switch on logging appropriately for your app
+ try
+ {
+ int i =0;
+ while (i < 1000)
+ {
+ try
+ {
+ //endlessly publish messages to monitor queue
+ publish();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dispatched monitor message");
+ }
+
+ //sleep for twenty seconds and then publish again - change if appropriate
+ //Thread.sleep(1000);
+ i++ ;
+ }
+ catch (UndeliveredMessageException a)
+ {
+ //trigger application specific failure handling here
+ _logger.error("Problem delivering monitor message");
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error trying to dispatch AMS monitor message: " + e);
+ System.exit(1);
+ }
+ finally
+ {
+ if (getMonitorPublisher() != null)
+ {
+ getMonitorPublisher().cleanup();
+ }
+ }
+
+ System.exit(1);
+ }
+
+ /**
+ * Publish heartbeat message
+ *
+ * @throws JMSException
+ * @throws UndeliveredMessageException
+ */
+ public static void publish() throws JMSException, UndeliveredMessageException
+ {
+ //Send the message generated from the payload using the _publisher
+// getMonitorPublisher().sendImmediateMessage
+// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
+
+ getMonitorPublisher().sendMessage
+ (getMonitorPublisher()._session,
+ FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()),
+ DeliveryMode.PERSISTENT, false, true);
+
+ }
+
+ /** Cleanup publishers */
+ public static void cleanup()
+ {
+ if (getMonitorPublisher() != null)
+ {
+ getMonitorPublisher().cleanup();
+ }
+
+ if (getMonitorPublisher() != null)
+ {
+ getMonitorPublisher().cleanup();
+ }
+ }
+
+ //Returns a _publisher for the monitor queue
+ private static MonitorPublisher getMonitorPublisher()
+ {
+ if (_monitorPublisher != null)
+ {
+ return _monitorPublisher;
+ }
+
+ //Create a _publisher using failover details and constant for monitor queue
+ _monitorPublisher = new MonitorPublisher();
+
+ _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME);
+ return _monitorPublisher;
+ }
+
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
new file mode 100644
index 0000000000..750f57d9dc
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.client.BasicMessageProducer;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+/**
+ * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via
+ * JMS MessageProducer
+ */
+public class MonitorPublisher extends Publisher
+{
+
+ private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
+
+ BasicMessageProducer _producer;
+
+ public MonitorPublisher()
+ {
+ super();
+ }
+
+ /*
+ * Publishes a message using given details
+ */
+ public boolean sendMessage(Session session, Message message, int deliveryMode,
+ boolean immediate, boolean commit) throws UndeliveredMessageException
+ {
+ try
+ {
+ _producer = (BasicMessageProducer) session.createProducer(_destination);
+
+ _producer.send(message, deliveryMode, immediate);
+
+ if (commit)
+ {
+ //commit the message send and close the transaction
+ _session.commit();
+ }
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed but do not rollback here as channel closed
+ _log.error("JMSException", e);
+ e.printStackTrace();
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
+ }
+
+ _log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+
+ /*
+ * Publishes a non-persistent message using transacted session
+ */
+ public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException
+ {
+ try
+ {
+ _producer = (BasicMessageProducer) _session.createProducer(_destination);
+
+ //Send message via our producer which is not persistent and is immediate
+ //NB: not available via jms interface MessageProducer
+ _producer.send(message, DeliveryMode.NON_PERSISTENT, true);
+
+ //commit the message send and close the transaction
+ _session.commit();
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed but do not rollback here as channel closed
+ _log.error("JMSException", e);
+ e.printStackTrace();
+ throw new UndeliveredMessageException("Cannot deliver immediate message", e);
+ }
+
+ _log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java
new file mode 100644
index 0000000000..a92efe99ac
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+import java.io.File;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+
+import org.apache.qpid.example.shared.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Class that sends parameterised number of message files to the Publisher
+ * Must set properties for host in properties file or uses in vm broker
+ */
+public class MultiMessageDispatcher
+{
+
+ protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class);
+
+ protected static Publisher _publisher = null;
+
+ /**
+ * To use this main method you need to specify a path or file to use for input
+ * This class then uses file contents from the dir/file specified to generate
+ * messages to publish
+ * Intended to be a very simple way to get going with publishing using the broker
+ * @param args - must specify one value, the path to file(s) for publisher
+ */
+ public static void main(String[] args)
+ {
+
+ // Check command line args ok - must provide a path or file for us to dispatch
+ if (args.length < 2)
+ {
+ System.out.println("Usage: MultiMessageDispatcher <numberOfMessagesToSend> <topic(true|false)>" + "");
+ }
+ else
+ {
+ boolean topicPublisher = true;
+
+ try
+ {
+ // publish message(s)
+ topicPublisher = new Boolean(args[1]).booleanValue();
+ publish(new Integer(args[0]).intValue(),topicPublisher);
+
+ // Move payload file(s) to archive location as no error
+ FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH));
+ }
+ catch (Exception e)
+ {
+ // log error and exit
+ _logger.error("Error trying to dispatch message: " + e);
+ System.exit(1);
+ }
+ finally
+ {
+
+ cleanup(topicPublisher);
+ }
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Finished dispatching message");
+ }
+
+ System.exit(0);
+ }
+
+ /**
+ * Publish the content of a file or files from a directory as messages
+ * @param numMessages - from main args
+ * @throws javax.jms.JMSException
+ * @throws org.apache.qpid.example.publisher.MessageFactoryException - if cannot create message from file content
+ */
+ public static void publish(int numMessages, boolean topicPublisher) throws JMSException, MessageFactoryException
+ {
+ {
+ // Send the message generated from the payload using the _publisher
+ getPublisher(topicPublisher).sendMessage(numMessages);
+ }
+ }
+
+ /**
+ * Cleanup before exit
+ */
+ public static void cleanup(boolean topicPublisher)
+ {
+ if (getPublisher(topicPublisher) != null)
+ {
+ getPublisher(topicPublisher).cleanup();
+ }
+ }
+
+ /**
+ * @return A Publisher instance
+ */
+ private static Publisher getPublisher(boolean topic)
+ {
+ if (_publisher != null)
+ {
+ return _publisher;
+ }
+
+ if (!topic)
+ {
+ // Create a _publisher
+ _publisher = new Publisher();
+ }
+ else
+ {
+ _publisher = new TopicPublisher();
+ }
+ return _publisher;
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
new file mode 100644
index 0000000000..b5f44557a4
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import javax.jms.*;
+
+import javax.naming.InitialContext;
+
+import org.apache.qpid.example.shared.InitialContextHelper;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+public class Publisher
+{
+ private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
+
+ protected InitialContextHelper _contextHelper;
+
+ protected Connection _connection;
+
+ protected Session _session;
+
+ protected MessageProducer _producer;
+
+ protected String _destinationDir;
+
+ protected String _name = "Publisher";
+
+ protected Destination _destination;
+
+ protected static final String _defaultDestinationDir = "/tmp";
+
+ /**
+ * Creates a Publisher instance using properties from example.properties
+ * See InitialContextHelper for details of how context etc created
+ */
+ public Publisher()
+ {
+ try
+ {
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //then create a connection using the AMQConnectionFactory
+ AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
+ _connection = cf.createConnection();
+
+ _connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println("ExceptionListener caught: " + jmse);
+ //System.exit(0);
+ }
+ });
+
+ //create a transactional session
+ _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //lookup the example queue and use it
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ _destination = (Queue) ctx.lookup("MyQueue");
+
+ //create a message producer
+ _producer = _session.createProducer(_destination);
+
+ //set destination dir for files that have been processed
+ _destinationDir = _defaultDestinationDir;
+
+ _connection.start();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _log.error("Exception", e);
+ }
+ }
+
+ /**
+ * Creates and sends the number of messages specified in the param
+ */
+ public void sendMessage(int numMessages)
+ {
+ try
+ {
+ TextMessage txtMessage = _session.createTextMessage("msg");
+ for (int i=0;i<numMessages;i++)
+ {
+ sendMessage(txtMessage);
+ _log.info("Sent: " + i);
+ }
+ }
+ catch (JMSException j)
+ {
+ _log.error("Exception in sendMessage" + j);
+ }
+
+
+ }
+
+ /**
+ * Publishes a non-persistent message using transacted session
+ * Note that persistent is the default mode for send - so need to specify for transient
+ */
+ public boolean sendMessage(Message message)
+ {
+ try
+ {
+ //Send message via our producer which is not persistent
+ _producer.send(message, DeliveryMode.PERSISTENT, _producer.getPriority(), _producer.getTimeToLive());
+
+ //commit the message send and close the transaction
+ _session.commit();
+
+ }
+ catch (JMSException e)
+ {
+ //Have to assume our commit failed and rollback here
+ try
+ {
+ _session.rollback();
+ _log.error("JMSException", e);
+ e.printStackTrace();
+ return false;
+ }
+ catch (JMSException j)
+ {
+ _log.error("Unable to rollback publish transaction ",e);
+ return false;
+ }
+ }
+
+ //_log.info(_name + " finished sending message: " + message);
+ return true;
+ }
+
+ /**
+ * Cleanup resources before exit
+ */
+ public void cleanup()
+ {
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.stop();
+ _connection.close();
+ }
+ _connection = null;
+ _producer = null;
+ }
+ catch(Exception e)
+ {
+ _log.error("Error trying to cleanup publisher " + e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Exposes session
+ * @return Session
+ */
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public String getDestinationDir()
+ {
+ return _destinationDir;
+ }
+
+ public void setDestinationDir(String destinationDir)
+ {
+ _destinationDir = destinationDir;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void setName(String _name) {
+ this._name = _name;
+ }
+}
+
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
new file mode 100644
index 0000000000..8645e41101
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.publisher;
+
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.example.shared.InitialContextHelper;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+
+/**
+ * Subclass of Publisher which sends messages to a topic destination defined in example.properties
+ */
+public class TopicPublisher extends Publisher
+{
+
+ private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
+
+ public TopicPublisher()
+ {
+ super();
+
+ try
+ {
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //lookup the example topic and use it
+ _destination = (Topic) ctx.lookup("MyTopic");
+
+ //create a message producer
+ _producer = _session.createProducer(_destination);
+ }
+ catch (Exception e)
+ {
+ //argh
+ _log.error("Exception trying to construct TopicPublisher" + e);
+ }
+
+ }
+} \ No newline at end of file
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
new file mode 100644
index 0000000000..245008b68a
--- /dev/null
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.publisher;
+
+/**
+ * Exception thrown by monitor when cannot send a message marked for immediate delivery
+ */
+public class UndeliveredMessageException extends Exception
+{
+ public UndeliveredMessageException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+}