diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
commit | 4eaa4e42093e5524d9552d8fa312c214524b6bb4 (patch) | |
tree | a251d57ee92d9c779fe4455c583be0ed90e69a43 /qpid/java/client/example/src/main/java/org | |
parent | 92be7e8f3163c048a8642d2deeaa921bbb65dc9c (diff) | |
download | qpid-python-rg-amqp-1-0-sandbox.tar.gz |
NO-JIRA : AMQP-1-0 sandbox updates - merge from trunkrg-amqp-1-0-sandbox
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1299257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/example/src/main/java/org')
28 files changed, 49 insertions, 2744 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java index b43031ad23..28e1d5a87e 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java @@ -27,8 +27,6 @@ import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.slf4j.Logger; public class Drain extends OptionParser { @@ -50,13 +48,13 @@ public class Drain extends OptionParser static { - optDefs.add(BROKER); - optDefs.add(HELP); - optDefs.add(TIMEOUT); - optDefs.add(FOREVER); - optDefs.add(COUNT); - optDefs.add(CON_OPTIONS); - optDefs.add(BROKER_OPTIONS); + addOption(BROKER); + addOption(HELP); + addOption(TIMEOUT); + addOption(FOREVER); + addOption(COUNT); + addOption(CON_OPTIONS); + addOption(BROKER_OPTIONS); } public Drain(String[] args, String usage, String desc) throws Exception @@ -66,7 +64,7 @@ public class Drain extends OptionParser Connection con = createConnection(); con.start(); Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = new AMQAnyDestination(address); + Destination dest = new AMQAnyDestination(getAddress()); MessageConsumer consumer = ssn.createConsumer(dest); Message msg; diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java index f4e17c5c4c..6aa12f07fa 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java @@ -68,12 +68,12 @@ public class OptionParser String.class); - protected Map<String,Object> optMap = new HashMap<String,Object>(); - protected static final List<Option> optDefs = new ArrayList<Option>(); + private Map<String,Object> optMap = new HashMap<String,Object>(); + private static final List<Option> optDefs = new ArrayList<Option>(); - protected String usage; - protected String desc; - protected String address; + private String usage; + private String desc; + private String address; public OptionParser(String[] args, String usage, String desc) { @@ -147,8 +147,8 @@ public class OptionParser for (Option option: optDefs) { - if ((op.startsWith("-") && option.shortForm != null && option.shortForm.equals(key)) || - (op.startsWith("--") && option.longForm != null && option.longForm.equals(key)) ) + if ((op.startsWith("-") && option.getShortForm() != null && option.getShortForm().equals(key)) || + (op.startsWith("--") && option.getLongForm() != null && option.getLongForm().equals(key)) ) { match = true; break; @@ -205,7 +205,9 @@ public class OptionParser if (op.startsWith("'")) { if (!op.endsWith("'")) + { throw new IllegalArgumentException(" The option " + op + " needs to be inside quotes"); + } return op.substring(1,op.length() -1); } @@ -217,18 +219,18 @@ public class OptionParser protected boolean containsOp(Option op) { - return optMap.containsKey(op.shortForm) || optMap.containsKey(op.longForm); + return optMap.containsKey(op.getShortForm()) || optMap.containsKey(op.getLongForm()); } protected String getOp(Option op) { - if (optMap.containsKey(op.shortForm)) + if (optMap.containsKey(op.getShortForm())) { - return (String)optMap.get(op.shortForm); + return (String)optMap.get(op.getShortForm()); } - else if (optMap.containsKey(op.longForm)) + else if (optMap.containsKey(op.getLongForm())) { - return (String)optMap.get(op.longForm); + return (String)optMap.get(op.getLongForm()); } else { @@ -281,15 +283,25 @@ public class OptionParser Connection con = new AMQConnection(buf.toString()); return con; } - + + public static void addOption(Option opt) + { + optDefs.add(opt); + } + + protected String getAddress() + { + return address; + } + static class Option { - private String shortForm; - private String longForm; - private String desc; - private String valueLabel; - private String defaultValue; - private Class type; + private final String shortForm; + private final String longForm; + private final String desc; + private final String valueLabel; + private final String defaultValue; + private final Class type; public Option(String shortForm, String longForm, String desc, String valueLabel, String defaultValue, Class type) diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java index 5da319a658..61ff2dfc19 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java @@ -69,15 +69,15 @@ public class Spout extends OptionParser static { - optDefs.add(BROKER); - optDefs.add(HELP); - optDefs.add(TIMEOUT); - optDefs.add(COUNT); - optDefs.add(MSG_PROPERTY); - optDefs.add(MAP_ENTRY); - optDefs.add(CONTENT); - optDefs.add(CON_OPTIONS); - optDefs.add(BROKER_OPTIONS); + addOption(BROKER); + addOption(HELP); + addOption(TIMEOUT); + addOption(COUNT); + addOption(MSG_PROPERTY); + addOption(MAP_ENTRY); + addOption(CONTENT); + addOption(CON_OPTIONS); + addOption(BROKER_OPTIONS); } public Spout(String[] args, String usage, String desc) throws Exception @@ -87,7 +87,7 @@ public class Spout extends OptionParser Connection con = createConnection(); con.start(); Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = new AMQAnyDestination(address); + Destination dest = new AMQAnyDestination(getAddress()); MessageProducer producer = ssn.createProducer(dest); int count = Integer.parseInt(getOp(COUNT)); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java deleted file mode 100644 index 1849f733e9..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.publisher; - -import java.io.File; - -import javax.jms.JMSException; - - -import org.apache.qpid.example.shared.FileUtils; -import org.apache.qpid.example.shared.Statics; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -/** - * Class that sends message files to the Publisher to distribute - * using files as input - * Must set properties for host in properties file or uses in vm broker - */ -public class FileMessageDispatcher -{ - - protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class); - - protected static Publisher _publisher = null; - - /** - * To use this main method you need to specify a path or file to use for input - * This class then uses file contents from the dir/file specified to generate - * messages to publish - * Intended to be a very simple way to get going with publishing using the broker - * @param args - must specify one value, the path to file(s) for publisher - */ - public static void main(String[] args) - { - - // Check command line args ok - must provide a path or file for us to dispatch - if (args.length == 0) - { - System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); - } - else - { - try - { - // publish message(s) from file(s) to configured queue - publish(args[0]); - - // Move payload file(s) to archive location as no error - FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); - } - catch (Exception e) - { - // log error and exit - _logger.error("Error trying to dispatch message: " + e); - System.exit(1); - } - finally - { - // clean up before exiting - if (getPublisher() != null) - { - getPublisher().cleanup(); - } - } - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Finished dispatching message"); - } - - System.exit(0); - } - - /** - * Publish the content of a file or files from a directory as messages - * @param path - from main args - * @throws JMSException - * @throws MessageFactoryException - if cannot create message from file content - */ - public static void publish(String path) throws JMSException, MessageFactoryException - { - File tempFile = new File(path); - if (tempFile.isDirectory()) - { - // while more files in dir publish them - File[] files = tempFile.listFiles(); - - if ((files == null) || (files.length == 0)) - { - _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); - } - else - { - for (File file : files) - { - // Create message factory passing in payload path - FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); - - // Send the message generated from the payload using the _publisher - getPublisher().sendMessage(factory.createEventMessage()); - - } - } - } - else - { - // handle a single file - // Create message factory passing in payload path - FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString()); - - // Send the message generated from the payload using the _publisher - getPublisher().sendMessage(factory.createEventMessage()); - } - } - - /** - * Cleanup before exit - */ - public static void cleanup() - { - if (getPublisher() != null) - { - getPublisher().cleanup(); - } - } - - /** - * @return A Publisher instance - */ - private static Publisher getPublisher() - { - if (_publisher != null) - { - return _publisher; - } - - // Create a _publisher - _publisher = new Publisher(); - - return _publisher; - } - -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java deleted file mode 100644 index 04339b2498..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.example.publisher; - -import org.apache.qpid.util.FileUtils; -import org.apache.qpid.example.shared.Statics; - -import java.io.*; -import javax.jms.*; - -public class FileMessageFactory -{ - protected final Session _session; - protected final String _payload; - protected final String _filename; - - /** - * Contructs and instance using a filename from which content will be used to create message - * @param session - * @param filename - * @throws MessageFactoryException - */ - public FileMessageFactory(Session session, String filename) throws MessageFactoryException - { - try - { - _filename = filename; - _payload = FileUtils.readFileAsString(filename); - _session = session; - } - catch (Exception e) - { - MessageFactoryException mfe = new MessageFactoryException(e.toString(), e); - throw mfe; - } - } - - /** - * Creates a text message and sets filename property on it - * The filename property is purely intended to provide visibility - * of file content passing trhough the broker using example classes - * @return Message - a TextMessage with content from file - * @throws JMSException - */ - public Message createEventMessage() throws JMSException - { - TextMessage msg = _session.createTextMessage(); - msg.setText(_payload); - msg.setStringProperty(Statics.FILENAME_PROPERTY, new File(_filename).getName()); - - return msg; - } - - /** - * Creates message from a string for use by the monitor - * @param session - * @param textMsg - message content - * @return Message - TextMessage with content from String - * @throws JMSException - */ - public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException - { - TextMessage msg = session.createTextMessage(); - msg.setText(textMsg); - - return msg; - } - - public Message createShutdownMessage() throws JMSException - { - return _session.createTextMessage("SHUTDOWN"); - } - - public Message createReportRequestMessage() throws JMSException - { - return _session.createTextMessage("REPORT"); - } - - public Message createReportResponseMessage(String msg) throws JMSException - { - return _session.createTextMessage(msg); - } - - public boolean isShutdown(Message m) - { - return checkText(m, "SHUTDOWN"); - } - - public boolean isReport(Message m) - { - return checkText(m, "REPORT"); - } - - public Object getReport(Message m) - { - try - { - return ((TextMessage) m).getText(); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - - return e.toString(); - } - } - - private static boolean checkText(Message m, String s) - { - try - { - return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - - return false; - } - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java deleted file mode 100644 index d709da6432..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.publisher; - -public class MessageFactoryException extends Exception -{ - public MessageFactoryException(String msg, Throwable t) - { - super(msg, t); - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java deleted file mode 100644 index 3d16e01af4..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; - -/** - * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds - * apart) heartbeat message - */ -public class MonitorMessageDispatcher -{ - - private static final Logger _logger = LoggerFactory.getLogger(MonitorMessageDispatcher.class); - - protected static MonitorPublisher _monitorPublisher = null; - - protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; - - /** - * Easy entry point for running a message dispatcher for monitoring consumption - * Sends 1000 messages with no delay - * - * @param args - */ - public static void main(String[] args) - { - //Switch on logging appropriately for your app - try - { - int i =0; - while (i < 1000) - { - try - { - //endlessly publish messages to monitor queue - publish(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Dispatched monitor message"); - } - - //sleep for twenty seconds and then publish again - change if appropriate - //Thread.sleep(1000); - i++ ; - } - catch (UndeliveredMessageException a) - { - //trigger application specific failure handling here - _logger.error("Problem delivering monitor message"); - break; - } - } - } - catch (Exception e) - { - _logger.error("Error trying to dispatch AMS monitor message: " + e); - System.exit(1); - } - finally - { - if (getMonitorPublisher() != null) - { - getMonitorPublisher().cleanup(); - } - } - - System.exit(1); - } - - /** - * Publish heartbeat message - * - * @throws JMSException - * @throws UndeliveredMessageException - */ - public static void publish() throws JMSException, UndeliveredMessageException - { - //Send the message generated from the payload using the _publisher -// getMonitorPublisher().sendImmediateMessage -// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); - - getMonitorPublisher().sendMessage - (getMonitorPublisher()._session, - FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), - DeliveryMode.PERSISTENT, false, true); - - } - - /** Cleanup publishers */ - public static void cleanup() - { - if (getMonitorPublisher() != null) - { - getMonitorPublisher().cleanup(); - } - - if (getMonitorPublisher() != null) - { - getMonitorPublisher().cleanup(); - } - } - - //Returns a _publisher for the monitor queue - private static MonitorPublisher getMonitorPublisher() - { - if (_monitorPublisher != null) - { - return _monitorPublisher; - } - - //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(); - - _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); - return _monitorPublisher; - } - -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java deleted file mode 100644 index 750f57d9dc..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.qpid.client.BasicMessageProducer; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; - -/** - * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via - * JMS MessageProducer - */ -public class MonitorPublisher extends Publisher -{ - - private static final Logger _log = LoggerFactory.getLogger(Publisher.class); - - BasicMessageProducer _producer; - - public MonitorPublisher() - { - super(); - } - - /* - * Publishes a message using given details - */ - public boolean sendMessage(Session session, Message message, int deliveryMode, - boolean immediate, boolean commit) throws UndeliveredMessageException - { - try - { - _producer = (BasicMessageProducer) session.createProducer(_destination); - - _producer.send(message, deliveryMode, immediate); - - if (commit) - { - //commit the message send and close the transaction - _session.commit(); - } - - } - catch (JMSException e) - { - //Have to assume our commit failed but do not rollback here as channel closed - _log.error("JMSException", e); - e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message", e); - } - - _log.info(_name + " finished sending message: " + message); - return true; - } - - /* - * Publishes a non-persistent message using transacted session - */ - public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException - { - try - { - _producer = (BasicMessageProducer) _session.createProducer(_destination); - - //Send message via our producer which is not persistent and is immediate - //NB: not available via jms interface MessageProducer - _producer.send(message, DeliveryMode.NON_PERSISTENT, true); - - //commit the message send and close the transaction - _session.commit(); - - } - catch (JMSException e) - { - //Have to assume our commit failed but do not rollback here as channel closed - _log.error("JMSException", e); - e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message", e); - } - - _log.info(_name + " finished sending message: " + message); - return true; - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java deleted file mode 100644 index a92efe99ac..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MultiMessageDispatcher.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.publisher; - -import java.io.File; - -import javax.jms.JMSException; -import javax.jms.TextMessage; - - -import org.apache.qpid.example.shared.FileUtils; -import org.apache.qpid.example.shared.Statics; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -/** - * Class that sends parameterised number of message files to the Publisher - * Must set properties for host in properties file or uses in vm broker - */ -public class MultiMessageDispatcher -{ - - protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class); - - protected static Publisher _publisher = null; - - /** - * To use this main method you need to specify a path or file to use for input - * This class then uses file contents from the dir/file specified to generate - * messages to publish - * Intended to be a very simple way to get going with publishing using the broker - * @param args - must specify one value, the path to file(s) for publisher - */ - public static void main(String[] args) - { - - // Check command line args ok - must provide a path or file for us to dispatch - if (args.length < 2) - { - System.out.println("Usage: MultiMessageDispatcher <numberOfMessagesToSend> <topic(true|false)>" + ""); - } - else - { - boolean topicPublisher = true; - - try - { - // publish message(s) - topicPublisher = new Boolean(args[1]).booleanValue(); - publish(new Integer(args[0]).intValue(),topicPublisher); - - // Move payload file(s) to archive location as no error - FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); - } - catch (Exception e) - { - // log error and exit - _logger.error("Error trying to dispatch message: " + e); - System.exit(1); - } - finally - { - - cleanup(topicPublisher); - } - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Finished dispatching message"); - } - - System.exit(0); - } - - /** - * Publish the content of a file or files from a directory as messages - * @param numMessages - from main args - * @throws javax.jms.JMSException - * @throws org.apache.qpid.example.publisher.MessageFactoryException - if cannot create message from file content - */ - public static void publish(int numMessages, boolean topicPublisher) throws JMSException, MessageFactoryException - { - { - // Send the message generated from the payload using the _publisher - getPublisher(topicPublisher).sendMessage(numMessages); - } - } - - /** - * Cleanup before exit - */ - public static void cleanup(boolean topicPublisher) - { - if (getPublisher(topicPublisher) != null) - { - getPublisher(topicPublisher).cleanup(); - } - } - - /** - * @return A Publisher instance - */ - private static Publisher getPublisher(boolean topic) - { - if (_publisher != null) - { - return _publisher; - } - - if (!topic) - { - // Create a _publisher - _publisher = new Publisher(); - } - else - { - _publisher = new TopicPublisher(); - } - return _publisher; - } - -}
\ No newline at end of file diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java deleted file mode 100644 index b5f44557a4..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.qpid.client.AMQConnectionFactory; - -import javax.jms.*; - -import javax.naming.InitialContext; - -import org.apache.qpid.example.shared.InitialContextHelper; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -public class Publisher -{ - private static final Logger _log = LoggerFactory.getLogger(Publisher.class); - - protected InitialContextHelper _contextHelper; - - protected Connection _connection; - - protected Session _session; - - protected MessageProducer _producer; - - protected String _destinationDir; - - protected String _name = "Publisher"; - - protected Destination _destination; - - protected static final String _defaultDestinationDir = "/tmp"; - - /** - * Creates a Publisher instance using properties from example.properties - * See InitialContextHelper for details of how context etc created - */ - public Publisher() - { - try - { - //get an initial context from default properties - _contextHelper = new InitialContextHelper(null); - InitialContext ctx = _contextHelper.getInitialContext(); - - //then create a connection using the AMQConnectionFactory - AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); - _connection = cf.createConnection(); - - _connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException jmse) - { - // The connection may have broken invoke reconnect code if available. - // The connection may have broken invoke reconnect code if available. - System.err.println("ExceptionListener caught: " + jmse); - //System.exit(0); - } - }); - - //create a transactional session - _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - //lookup the example queue and use it - //Queue is non-exclusive and not deleted when last consumer detaches - _destination = (Queue) ctx.lookup("MyQueue"); - - //create a message producer - _producer = _session.createProducer(_destination); - - //set destination dir for files that have been processed - _destinationDir = _defaultDestinationDir; - - _connection.start(); - } - catch (Exception e) - { - e.printStackTrace(); - _log.error("Exception", e); - } - } - - /** - * Creates and sends the number of messages specified in the param - */ - public void sendMessage(int numMessages) - { - try - { - TextMessage txtMessage = _session.createTextMessage("msg"); - for (int i=0;i<numMessages;i++) - { - sendMessage(txtMessage); - _log.info("Sent: " + i); - } - } - catch (JMSException j) - { - _log.error("Exception in sendMessage" + j); - } - - - } - - /** - * Publishes a non-persistent message using transacted session - * Note that persistent is the default mode for send - so need to specify for transient - */ - public boolean sendMessage(Message message) - { - try - { - //Send message via our producer which is not persistent - _producer.send(message, DeliveryMode.PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); - - //commit the message send and close the transaction - _session.commit(); - - } - catch (JMSException e) - { - //Have to assume our commit failed and rollback here - try - { - _session.rollback(); - _log.error("JMSException", e); - e.printStackTrace(); - return false; - } - catch (JMSException j) - { - _log.error("Unable to rollback publish transaction ",e); - return false; - } - } - - //_log.info(_name + " finished sending message: " + message); - return true; - } - - /** - * Cleanup resources before exit - */ - public void cleanup() - { - try - { - if (_connection != null) - { - _connection.stop(); - _connection.close(); - } - _connection = null; - _producer = null; - } - catch(Exception e) - { - _log.error("Error trying to cleanup publisher " + e); - System.exit(1); - } - } - - /** - * Exposes session - * @return Session - */ - public Session getSession() - { - return _session; - } - - public String getDestinationDir() - { - return _destinationDir; - } - - public void setDestinationDir(String destinationDir) - { - _destinationDir = destinationDir; - } - - public String getName() - { - return _name; - } - - public void setName(String _name) { - this._name = _name; - } -} - diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java deleted file mode 100644 index 8645e41101..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.qpid.example.shared.InitialContextHelper; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -import javax.jms.*; -import javax.naming.InitialContext; - -/** - * Subclass of Publisher which sends messages to a topic destination defined in example.properties - */ -public class TopicPublisher extends Publisher -{ - - private static final Logger _log = LoggerFactory.getLogger(Publisher.class); - - public TopicPublisher() - { - super(); - - try - { - _contextHelper = new InitialContextHelper(null); - InitialContext ctx = _contextHelper.getInitialContext(); - - //lookup the example topic and use it - _destination = (Topic) ctx.lookup("MyTopic"); - - //create a message producer - _producer = _session.createProducer(_destination); - } - catch (Exception e) - { - //argh - _log.error("Exception trying to construct TopicPublisher" + e); - } - - } -}
\ No newline at end of file diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java deleted file mode 100644 index 245008b68a..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.publisher; - -/** - * Exception thrown by monitor when cannot send a message marked for immediate delivery - */ -public class UndeliveredMessageException extends Exception -{ - public UndeliveredMessageException(String msg, Throwable t) - { - super(msg, t); - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java deleted file mode 100644 index e32ee0ba73..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.example.pubsub; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.naming.NamingException; - -/** - * An abstract base class that wraps up the creation of a JMS client utilising JNDI - */ -public abstract class Client -{ - protected ConnectionSetup _setup; - - protected Connection _connection; - protected Destination _destination; - protected Session _session; - - public Client(String destination) - { - if (destination == null) - { - destination = ConnectionSetup.TOPIC_JNDI_NAME; - } - - try - { - _setup = new ConnectionSetup(); - } - catch (NamingException e) - { - //ignore - } - - if (_setup != null) - { - try - { - _connection = _setup.getConnectionFactory().createConnection(); - _destination = _setup.getDestination(destination); - } - catch (JMSException e) - { - System.err.println(e.getMessage()); - } - } - } - - public abstract void start(); - -}
\ No newline at end of file diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java deleted file mode 100644 index 0734704e59..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.example.pubsub; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Properties; - -/** - * This ConnectionSetup is a wrapper around JNDI it creates a number of entries. - * - * It is equivalent to a PropertyFile of value: - * - * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost' - * - * queue.queue=example.MyQueue - * topic.topic=example.hierarical.topic - * - */ -public class ConnectionSetup -{ - final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - final static String CONNECTION_JNDI_NAME = "local"; - final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'"; - - public static final String QUEUE_JNDI_NAME = "queue"; - final static String QUEUE_NAME = "example.MyQueue"; - - public static final String TOPIC_JNDI_NAME = "topic"; - final static String TOPIC_NAME = "usa.news"; - - private Context _ctx; - - public ConnectionSetup() throws NamingException - { - - // Set the properties ... - Properties properties = new Properties(); - properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); - properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); - - properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); - properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME); - // Create the initial context - _ctx = new InitialContext(properties); - - } - - public ConnectionSetup(Properties properties) throws NamingException - { - _ctx = new InitialContext(properties); - } - - public ConnectionFactory getConnectionFactory() - { - - // Perform the lookups - try - { - return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME); - } - catch (NamingException e) - { - //ignore - } - return null; - } - - public Destination getDestination(String jndiName) - { - // Perform the lookups - try - { - return (Destination) _ctx.lookup(jndiName); - } - catch (ClassCastException cce) - { - //ignore - } - catch (NamingException ne) - { - //ignore - } - return null; - } - - - public void close() - { - try - { - _ctx.close(); - } - catch (NamingException e) - { - //ignore - } - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java deleted file mode 100644 index ac3829d49e..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.example.pubsub; - -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; - -/** - * A simple Publisher example. - * - * The class can take two arguments. - * java Publisher <destination> <msgCount> - * Where: - * destination is either 'topic' or 'queue' (Default: topic) - * msgCount is the number of messages to send (Default : 100) - * - */ -public class Publisher extends Client -{ - int _msgCount; - - public Publisher(String destination, int msgCount) - { - super(destination); - _msgCount = msgCount; - } - - public void start() - { - try - { - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer _producer = _session.createProducer(_destination); - - for (int msgCount = 0; msgCount < _msgCount; msgCount++) - { - _producer.send(_session.createTextMessage("msg:" + msgCount)); - System.out.println("Sent:" + msgCount); - } - - System.out.println("Done."); - _connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - - public static void main(String[] args) - { - - String destination = args.length > 2 ? args[1] : "usa.news"; - - int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; - - new Publisher(destination, msgCount).start(); - } - -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java deleted file mode 100644 index f2d736701f..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.example.pubsub; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import java.util.concurrent.CountDownLatch; - - -/** - * Simple client that listens for the specified number of msgs on the given Destinaton - * - * The class can take two arguments. - * java Subscriber <destination> <msgCount> - * Where: - * destination is either 'topic' or 'queue' (Default: topic) - * msgCount is the number of messages to send (Default : 100) - */ -public class Subscriber extends Client implements MessageListener -{ - - CountDownLatch _count; - - public Subscriber(String destination, int msgCount) - { - super(destination); - _count = new CountDownLatch(msgCount); - } - - - public void start() - { - try - { - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME), - "exampleClient").setMessageListener(this); - _connection.start(); - _count.await(); - - System.out.println("Done"); - - _connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - public static void main(String[] args) - { - String destination = args.length > 2 ? args[1] : null; - int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; - - new Subscriber(destination, msgCount).start(); - } - - public void onMessage(Message message) - { - try - { - _count.countDown(); - System.out.println("Received msg:" + ((TextMessage) message).getText()); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java deleted file mode 100644 index 1a3d596a24..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.shared; - -public class ConnectionException extends Exception -{ - public ConnectionException(String msg, Throwable t) - { - super(msg, t); - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java deleted file mode 100644 index 2987a9559b..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.shared; - -public class ContextException extends Exception -{ - public ContextException(String msg, Throwable t) - { - super(msg, t); - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java deleted file mode 100644 index 54446cb6a7..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.shared; - -import java.io.*; - -/** - * Class that provides file related utility methods for utility use - */ -public class FileUtils { - - - //Reads file content into String - public static String getFileContent(String filePath) throws IOException - { - - BufferedReader reader = null; - String tempData = ""; - String eol = "\n\r"; - - try - { - String line; - reader = new BufferedReader(new FileReader(filePath)); - while ((line = reader.readLine()) != null) - { - if (!tempData.equals("")) - { - tempData = tempData + eol + line; - } - else - { - tempData = line; - } - } - } - finally - { - if (reader != null) - { - reader.close(); - } - } - return tempData; - } - - /* - * Reads xml from a file and returns it as an array of chars - */ - public static char[] getFileAsCharArray(String filePath) throws IOException - { - BufferedReader reader = null; - char[] tempChars = null; - String tempData = ""; - - try - { - String line; - reader = new BufferedReader(new FileReader(filePath)); - while ((line = reader.readLine()) != null) - { - tempData = tempData + line; - } - tempChars = tempData.toCharArray(); - } - finally - { - if (reader != null) - { - reader.close(); - } - } - return tempChars; - } - - /* - * Write String content to filename provided - */ - public static void writeStringToFile(String content, String path) throws IOException - { - - BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path))); - writer.write(content); - writer.flush(); - writer.close(); - } - - /* - * Allows moving of files to a new dir and preserves the last bit of the name only - */ - public static void moveFileToNewDir(String path, String newDir) throws IOException - { - //get file name from current path - //while more files in dir publish them - File pathFile = new File(path); - if (pathFile.isDirectory()) - { - File[] files = pathFile.listFiles(); - for (File file : files) - { - moveFileToNewDir(file,newDir); - } - } - } - - /* - * Allows moving of a file to a new dir and preserves the last bit of the name only - */ - public static void moveFileToNewDir(File fileToMove, String newDir) throws IOException - { - moveFile(fileToMove,getArchiveFileName(fileToMove,newDir)); - } - - /* - * Moves file from a given path to a new path with String params - */ - public static void moveFile(String fromPath, String dest) throws IOException - { - moveFile(new File(fromPath),new File(dest)); - } - - /* - * Moves file from a given path to a new path with mixed params - */ - public static void moveFile(File fileToMove, String dest) throws IOException - { - moveFile(fileToMove,new File(dest)); - } - - /* - * Moves file from a given path to a new path with File params - */ - public static void moveFile(File fileToMove, File dest) throws IOException - { - fileToMove.renameTo(dest); - } - - /* - * Deletes a given file - */ - public static void deleteFile(String filePath) throws IOException - { - new File(filePath).delete(); - } - - private static String getArchiveFileName(File fileToMove, String archiveDir) - { - //get file name from current path - String fileName = fileToMove.getName(); - return archiveDir + File.separator + fileName; - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java deleted file mode 100644 index 16a185133a..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.shared; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -/** - * Class that provides helper methods for JNDI - */ -public class InitialContextHelper -{ - - public static final String _defaultPropertiesName = "example.properties"; - protected Properties _fileProperties; - protected InitialContext _initialContext; - protected static final Logger _log = LoggerFactory.getLogger(InitialContextHelper.class); - - public InitialContextHelper(String propertiesName) throws ContextException - { - try - { - if ((propertiesName == null) || (propertiesName.length() == 0)) - { - propertiesName = _defaultPropertiesName; - } - - _fileProperties = new Properties(); - ClassLoader cl = this.getClass().getClassLoader(); - - // NB: Need to change path to reflect package if moving classes around ! - InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName); - _fileProperties.load(is); - _initialContext = new InitialContext(_fileProperties); - } - catch (IOException e) - { - throw new ContextException(e.toString(), e); - } - catch (NamingException n) - { - throw new ContextException(n.toString(), n); - } - } - - public Properties getFileProperties() - { - return _fileProperties; - } - - public InitialContext getInitialContext() - { - return _initialContext; - } - -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java deleted file mode 100644 index c056f8a7da..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.shared; - -/** - * Constants used by AMS Publisher/Subscriber classes - */ -public class Statics { - - public static final String TOPIC_NAME = "EXAMPLE_TOPIC"; - - public static final String QUEUE_NAME = "EXAMPLE_QUEUE"; - - public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR"; - - public static final String HOST_PROPERTY = "host"; - - public static final String PORT_PROPERTY = "port"; - - public static final String USER_PROPERTY = "user"; - - public static final String PWD_PROPERTY = "pwd"; - - public static final String TOPIC_PROPERTY = "topic"; - - public static final String QUEUE_PROPERTY = "queue"; - - public static final String VIRTUAL_PATH_PROPERTY = "virtualpath"; - - public static final String ARCHIVE_PATH = "archivepath"; - - public static final String CLIENT_PROPERTY = "client"; - - public static final String FILENAME_PROPERTY = "filename"; - - public static final String DEFAULT_USER = "guest"; - - public static final String DEFAULT_PWD = "guest"; - - -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties deleted file mode 100644 index c76acbd8b9..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory - -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' - -# register some queues in JNDI using the form -# queue.[jndiName] = [physicalName] -queue.MyQueue = example.MyQueue - -# register some topics in JNDI using the form -# topic.[jndiName] = [physicalName] -topic.ibmStocks = stocks.nyse.ibm -topic.MyTopic = example.MyTopic - -# Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers -# destination.[jniName] = [BindingURL] -destination.direct = direct://amq.direct//directQueue diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java deleted file mode 100644 index 8a0ff88448..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.example.simple.reqresp; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; - -public class Client implements MessageListener -{ - final String BROKER = "localhost"; - - final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - final String CONNECTION_JNDI_NAME = "local"; - final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; - - final String QUEUE_JNDI_NAME = "queue"; - final String QUEUE_NAME = "example.RequestQueue"; - - - private InitialContext _ctx; - - private CountDownLatch _shutdownHook = new CountDownLatch(1); - - public Client() - { - setupJNDI(); - - Connection connection; - Session session; - Destination responseQueue; - - //Setup the connection. Create producer to sent message and consumer to receive the repsonse. - MessageProducer _producer; - try - { - connection = ((ConnectionFactory) lookupJNDI(CONNECTION_JNDI_NAME)).createConnection(); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination requestQueue = (Queue) lookupJNDI(QUEUE_JNDI_NAME); - - closeJNDI(); - - //Setup a message _producer to send message to the queue the server is consuming from - _producer = session.createProducer(requestQueue); - - //Create a temporary queue that this client will listen for responses on then create a consumer - //that consumes message from this temporary queue. - responseQueue = session.createTemporaryQueue(); - - MessageConsumer responseConsumer = session.createConsumer(responseQueue); - - //Set a listener to asynchronously deal with responses. - responseConsumer.setMessageListener(this); - - // Now the connection is setup up start it. - connection.start(); - } - catch (JMSException e) - { - System.err.println("Unable to setup connection, client and producer on broker"); - return; - } - - // Setup the message to send - TextMessage txtMessage; - try - { - //Now create the actual message you want to send - txtMessage = session.createTextMessage("Request Process"); - - //Set the reply to field to the temp queue you created above, this is the queue the server will respond to - txtMessage.setJMSReplyTo(responseQueue); - - //Set a correlation ID so when you get a response you know which sent message the response is for - //If there is never more than one outstanding message to the server then the - //same correlation ID can be used for all the messages...if there is more than one outstanding - //message to the server you would presumably want to associate the correlation ID with this message - - txtMessage.setJMSCorrelationID(txtMessage.getJMSMessageID()); - } - catch (JMSException e) - { - System.err.println("Unable to create message"); - return; - - } - - try - { - _producer.send(txtMessage); - } - catch (JMSException e) - { - //Handle the exception appropriately - } - - try - { - System.out.println("Sent Request Message ID :" + txtMessage.getJMSMessageID()); - } - catch (JMSException e) - { - //Handle exception more appropriately. - } - - //Wait for the return message to arrive - try - { - _shutdownHook.await(); - } - catch (InterruptedException e) - { - // Ignore this as we are quitting anyway. - } - - //Close the connection - try - { - connection.close(); - } - catch (JMSException e) - { - System.err.println("A problem occured while shutting down the connection : " + e); - } - } - - - /** - * Implementation of the Message Listener interface. - * This is where message will be asynchronously delivered. - * - * @param message - */ - public void onMessage(Message message) - { - String messageText; - try - { - if (message instanceof TextMessage) - { - TextMessage textMessage = (TextMessage) message; - messageText = textMessage.getText(); - System.out.println("messageText = " + messageText); - System.out.println("Correlation ID " + message.getJMSCorrelationID()); - - _shutdownHook.countDown(); - } - else - { - System.err.println("Unexpected message delivered"); - } - } - catch (JMSException e) - { - //Handle the exception appropriately - } - } - - /** - * Lookup the specified name in the JNDI Context. - * - * @param name The string name of the object to lookup - * - * @return The object or null if nothing exists for specified name - */ - private Object lookupJNDI(String name) - { - try - { - return _ctx.lookup(name); - } - catch (NamingException e) - { - System.err.println("Error looking up '" + name + "' in JNDI Context:" + e); - } - - return null; - } - - /** - * Setup the JNDI context. - * - * In this case we are simply using a Properties object to store the pairing information. - * - * Further details can be found on the wiki site here: - * - * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html - */ - private void setupJNDI() - { - // Set the properties ... - Properties properties = new Properties(); - properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); - properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); - properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); - - // Create the initial context - Context ctx = null; - try - { - _ctx = new InitialContext(properties); - } - catch (NamingException e) - { - System.err.println("Error Setting up JNDI Context:" + e); - } - } - - /** Close the JNDI Context to keep everything happy. */ - private void closeJNDI() - { - try - { - _ctx.close(); - } - catch (NamingException e) - { - System.err.println("Unable to close JNDI Context : " + e); - } - } - - - public static void main(String[] args) - { - new Client(); - } -} - diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java deleted file mode 100644 index 9c284eee97..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.example.simple.reqresp; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.io.BufferedReader; -import java.io.BufferedInputStream; -import java.io.Reader; -import java.io.InputStreamReader; -import java.io.IOException; - -public class Server implements MessageListener -{ - final String BROKER = "localhost"; - - final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - final String CONNECTION_JNDI_NAME = "local"; - final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; - - final String QUEUE_JNDI_NAME = "queue"; - final String QUEUE_NAME = "example.RequestQueue"; - - - private InitialContext _ctx; - private Session _session; - private MessageProducer _replyProducer; - private CountDownLatch _shutdownHook = new CountDownLatch(1); - - public Server() - { - setupJNDI(); - - Connection connection; - try - { - connection = ((ConnectionFactory) lookupJNDI(CONNECTION_JNDI_NAME)).createConnection(); - - _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination requestQueue = (Queue) lookupJNDI(QUEUE_JNDI_NAME); - - closeJNDI(); - - //Setup a message producer to respond to messages from clients, we will get the destination - //to send to from the JMSReplyTo header field from a Message so we MUST set the destination here to null. - this._replyProducer = _session.createProducer(null); - - //Set up a consumer to consume messages off of the request queue - MessageConsumer consumer = _session.createConsumer(requestQueue); - consumer.setMessageListener(this); - - //Now start the connection - connection.start(); - } - catch (JMSException e) - { - //Handle the exception appropriately - System.err.println("JMSException occured setting up server :" + e); - return; - } - - System.out.println("Server process started and waiting for messages."); - - //Wait to process an single message then quit. - while (_shutdownHook.getCount() != 0) - { - try - { - _shutdownHook.await(); - } - catch (InterruptedException e) - { - // Ignore this as we are quitting anyway. - } - } - - //Close the connection - try - { - connection.close(); - } - catch (JMSException e) - { - System.err.println("A problem occured while shutting down the connection : " + e); - } - } - - public void onMessage(Message message) - { - try - { - TextMessage response = this._session.createTextMessage(); - - //Check we have the right message type. - if (message instanceof TextMessage) - { - TextMessage txtMsg = (TextMessage) message; - String messageText = txtMsg.getText(); - - //Perform the request - System.out.println("Received request:" + messageText + " for message :" + message.getJMSMessageID()); - - //Set the response back to the client - response.setText("Response to Request:" + messageText); - } - - //Set the correlation ID from the received message to be the correlation id of the response message - //this lets the client identify which message this is a response to if it has more than - //one outstanding message to the server - response.setJMSCorrelationID(message.getJMSMessageID()); - - try - { - System.out.println("Received message press enter to send response...."); - new BufferedReader(new InputStreamReader(System.in)).readLine(); - } - catch (IOException e) - { - //Error attemptying to pause - } - - //Send the response to the Destination specified by the JMSReplyTo field of the received message. - _replyProducer.send(message.getJMSReplyTo(), response); - } - catch (JMSException e) - { - //Handle the exception appropriately - } - - _shutdownHook.countDown(); - } - - /** - * Lookup the specified name in the JNDI Context. - * - * @param name The string name of the object to lookup - * - * @return The object or null if nothing exists for specified name - */ - private Object lookupJNDI(String name) - { - try - { - return _ctx.lookup(name); - } - catch (NamingException e) - { - System.err.println("Error looking up '" + name + "' in JNDI Context:" + e); - } - - return null; - } - - /** - * Setup the JNDI context. - * - * In this case we are simply using a Properties object to store the pairing information. - * - * Further details can be found on the wiki site here: - * - * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html - */ - private void setupJNDI() - { - // Set the properties ... - Properties properties = new Properties(); - properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); - properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); - properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); - - // Create the initial context - Context ctx = null; - try - { - _ctx = new InitialContext(properties); - } - catch (NamingException e) - { - System.err.println("Error Setting up JNDI Context:" + e); - } - } - - /** Close the JNDI Context to keep everything happy. */ - private void closeJNDI() - { - try - { - _ctx.close(); - } - catch (NamingException e) - { - System.err.println("Unable to close JNDI Context : " + e); - } - } - - - public static void main(String[] args) - { - new Server(); - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java deleted file mode 100644 index e4eb5ac7f5..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -import org.apache.qpid.example.shared.Statics; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -import javax.jms.*; -/** - * Subclass of Subscriber which consumes a heartbeat message - */ - -public class MonitoredSubscriber extends Subscriber -{ - protected String _monitorDestinationName; - - private static final Logger _logger = LoggerFactory.getLogger(MonitoredSubscriber.class); - - private MessageConsumer _monitorConsumer; - - public MonitoredSubscriber() - { - super(); - //lookup queue name and append suffix - _monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX; - } - - /** - * MessageListener implementation for this subscriber - */ - public static class MonitorMessageListener implements MessageListener - { - private String _name; - - public MonitorMessageListener(String name) - { - _name = name; - - } - - /** - * Listens for heartbeat messages and acknowledges them - * @param message - */ - public void onMessage(javax.jms.Message message) - { - _logger.info(_name + " monitor got message '" + message + "'"); - - try - { - _logger.debug("Monitor acknowledging recieved message"); - - //Now acknowledge the message to clear it from our queue - message.acknowledge(); - } - catch(JMSException j) - { - _logger.error("Monitor caught JMSException trying to acknowledge message receipt"); - j.printStackTrace(); - } - catch(Exception e) - { - _logger.error("Monitor caught unexpected exception trying to handle message"); - e.printStackTrace(); - } - } - } - - /** - * Subscribes to Queue and attaches additional monitor listener - */ - public void subscribeAndMonitor() - { - try - { - _connection = _connectionFactory.createConnection(); - - //create a transactional session - Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - //Queue is non-exclusive and not deleted when last consumer detaches - Destination destination = session.createQueue(_monitorDestinationName); - - //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _monitorConsumer = session.createConsumer(destination); - - //give the monitor message listener a name of it's own - _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener - ("MonitorListener " + System.currentTimeMillis())); - - MonitoredSubscriber._logger.info("Starting monitored subscription ..."); - - _connection.start(); - - //and now start ordinary consumption too - subscribe(); - } - catch (Throwable t) - { - _logger.error("Fatal error: " + t); - t.printStackTrace(); - } - } - - /** - * Stop consuming - */ - public void stopMonitor() - { - try - { - _monitorConsumer.close(); - _monitorConsumer = null; - stop(); - } - catch(JMSException j) - { - _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); - } - } - -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java deleted file mode 100644 index 5e78107182..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - - -/** - * Allows you to simply start a monitored subscriber - */ -public class MonitoredSubscriptionWrapper { - - private static MonitoredSubscriber _subscriber; - - /** - * Create a monitored subscriber and start it - * @param args - no params required - */ - public static void main(String args[]) - { - _subscriber = new MonitoredSubscriber(); - - _subscriber.subscribe(); - } - - /** - * Stop subscribing now ... - */ - public static void stop() - { - _subscriber.stop(); - } -} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java deleted file mode 100644 index c36668575f..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -import org.apache.qpid.client.AMQConnectionFactory; - -import javax.jms.*; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.InitialContext; - -import org.apache.qpid.example.shared.InitialContextHelper; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -/** - * Subscriber which consumes messages from a queue - */ - -public class Subscriber -{ - private static final Logger _log = LoggerFactory.getLogger(Subscriber.class); - - protected Connection _connection; - - protected MessageConsumer _consumer; - - protected InitialContextHelper _contextHelper; - - protected AMQConnectionFactory _connectionFactory; - - protected Destination _destination; - - public Subscriber() - { - try - { - //get an initial context from default properties - _contextHelper = new InitialContextHelper(null); - InitialContext ctx = _contextHelper.getInitialContext(); - - //then create a connection using the AMQConnectionFactory - _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); - - //lookup queue from context - _destination = (Destination) ctx.lookup("MyQueue"); - - } - catch (Exception e) - { - e.printStackTrace(); - _log.error("Exception", e); - } - } - - /** - * Listener class that handles messages - */ - public static class ExampleMessageListener implements MessageListener - { - private String _name; - - public ExampleMessageListener(String name) - { - _name = name; - } - - /** - * Listens for message callbacks, handles and then acknowledges them - * @param message - the message received - */ - public void onMessage(javax.jms.Message message) - { - _log.info(_name + " got message '" + message + "'"); - - try - { - //NB: Handle your message appropriately for your application here - //do some stuff - - _log.debug("Acknowledging recieved message"); - - //Now acknowledge the message to clear it from our queue - message.acknowledge(); - } - catch(JMSException j) - { - _log.error("JMSException trying to acknowledge message receipt"); - j.printStackTrace(); - } - catch(Exception e) - { - _log.error("Unexpected exception trying to handle message"); - e.printStackTrace(); - } - } - } - - /** - * Subscribes to example Queue and attaches listener - */ - public void subscribe() - { - _log.info("Starting subscription ..."); - - try - { - _connection = _connectionFactory.createConnection(); - - //Non transactional session using client acknowledgement - Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _consumer = session.createConsumer(_destination); - - //give the message listener a name of it's own - _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); - - _connection.start(); - } - catch (Throwable t) - { - _log.error("Fatal error: " + t); - t.printStackTrace(); - } - - _log.info("Waiting for messages ..."); - - //wait for messages and sleep to survive failover - try - { - while(true) - { - Thread.sleep(Long.MAX_VALUE); - } - } - catch (Exception e) - { - _log.warn("Exception while Subscriber sleeping",e); - } - } - - /** - * Stop consuming and close connection - */ - public void stop() - { - try - { - _consumer.close(); - _consumer = null; - _connection.stop(); - _connection.close(); - } - catch(JMSException j) - { - _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); - } - } - -} - - - - diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java deleted file mode 100644 index f8fbf63037..0000000000 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -/** - * Allows you to simply start a subscriber - */ -public class SubscriptionWrapper { - - private static Subscriber _subscriber; - - /** - * Create a subscriber and start it - * @param args - */ - public static void main(String args[]) - { - _subscriber = new Subscriber(); - - _subscriber.subscribe(); - } - - /** - * Stop subscribing now ... - */ - public static void stop() - { - _subscriber.stop(); - } -} |