diff options
author | Marnie McCormack <marnie@apache.org> | 2009-10-23 15:51:22 +0000 |
---|---|---|
committer | Marnie McCormack <marnie@apache.org> | 2009-10-23 15:51:22 +0000 |
commit | 6d088ecfd329afaa0966d05d55db7ebef3728359 (patch) | |
tree | 2b2d24579a6be4b3d2575d38afbf9d93b2620549 | |
parent | 95a888c20d9a0598620014e1b34f591a9fba5c9a (diff) | |
download | qpid-python-6d088ecfd329afaa0966d05d55db7ebef3728359.tar.gz |
Fix for large files in FileMessageFactory, changes to Publisher to make topic publication easier, added MultiMessageDispatcher class to allow parameterised sending of messages to a queue or a topic
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829102 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 181 insertions, 13 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index 1240284a56..04339b2498 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -19,7 +19,7 @@ package org.apache.qpid.example.publisher; -import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.util.FileUtils; import org.apache.qpid.example.shared.Statics; import java.io.*; @@ -42,10 +42,10 @@ public class FileMessageFactory try { _filename = filename; - _payload = FileUtils.getFileContent(filename); + _payload = FileUtils.readFileAsString(filename); _session = session; } - catch (IOException e) + catch (Exception e) { MessageFactoryException mfe = new MessageFactoryException(e.toString(), e); throw mfe; diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java new file mode 100644 index 0000000000..a92efe99ac --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.publisher; + +import java.io.File; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +/** + * Class that sends parameterised number of message files to the Publisher + * Must set properties for host in properties file or uses in vm broker + */ +public class MultiMessageDispatcher +{ + + protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class); + + protected static Publisher _publisher = null; + + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ + public static void main(String[] args) + { + + // Check command line args ok - must provide a path or file for us to dispatch + if (args.length < 2) + { + System.out.println("Usage: MultiMessageDispatcher <numberOfMessagesToSend> <topic(true|false)>" + ""); + } + else + { + boolean topicPublisher = true; + + try + { + // publish message(s) + topicPublisher = new Boolean(args[1]).booleanValue(); + publish(new Integer(args[0]).intValue(),topicPublisher); + + // Move payload file(s) to archive location as no error + FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); + } + catch (Exception e) + { + // log error and exit + _logger.error("Error trying to dispatch message: " + e); + System.exit(1); + } + finally + { + + cleanup(topicPublisher); + } + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Finished dispatching message"); + } + + System.exit(0); + } + + /** + * Publish the content of a file or files from a directory as messages + * @param numMessages - from main args + * @throws javax.jms.JMSException + * @throws org.apache.qpid.example.publisher.MessageFactoryException - if cannot create message from file content + */ + public static void publish(int numMessages, boolean topicPublisher) throws JMSException, MessageFactoryException + { + { + // Send the message generated from the payload using the _publisher + getPublisher(topicPublisher).sendMessage(numMessages); + } + } + + /** + * Cleanup before exit + */ + public static void cleanup(boolean topicPublisher) + { + if (getPublisher(topicPublisher) != null) + { + getPublisher(topicPublisher).cleanup(); + } + } + + /** + * @return A Publisher instance + */ + private static Publisher getPublisher(boolean topic) + { + if (_publisher != null) + { + return _publisher; + } + + if (!topic) + { + // Create a _publisher + _publisher = new Publisher(); + } + else + { + _publisher = new TopicPublisher(); + } + return _publisher; + } + +}
\ No newline at end of file diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java index 87fc543dbe..b5f44557a4 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -20,13 +20,7 @@ package org.apache.qpid.example.publisher; import org.apache.qpid.client.AMQConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.DeliveryMode; -import javax.jms.Queue; -import javax.jms.MessageProducer; -import javax.jms.Connection; -import javax.jms.Session; +import javax.jms.*; import javax.naming.InitialContext; @@ -50,7 +44,7 @@ public class Publisher protected String _name = "Publisher"; - protected Queue _destination; + protected Destination _destination; protected static final String _defaultDestinationDir = "/tmp"; @@ -70,6 +64,17 @@ public class Publisher AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); _connection = cf.createConnection(); + _connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println("ExceptionListener caught: " + jmse); + //System.exit(0); + } + }); + //create a transactional session _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -93,6 +98,28 @@ public class Publisher } /** + * Creates and sends the number of messages specified in the param + */ + public void sendMessage(int numMessages) + { + try + { + TextMessage txtMessage = _session.createTextMessage("msg"); + for (int i=0;i<numMessages;i++) + { + sendMessage(txtMessage); + _log.info("Sent: " + i); + } + } + catch (JMSException j) + { + _log.error("Exception in sendMessage" + j); + } + + + } + + /** * Publishes a non-persistent message using transacted session * Note that persistent is the default mode for send - so need to specify for transient */ @@ -101,7 +128,7 @@ public class Publisher try { //Send message via our producer which is not persistent - _producer.send(message, DeliveryMode.NON_PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); + _producer.send(message, DeliveryMode.PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); //commit the message send and close the transaction _session.commit(); @@ -124,7 +151,7 @@ public class Publisher } } - _log.info(_name + " finished sending message: " + message); + //_log.info(_name + " finished sending message: " + message); return true; } |