diff options
author | Marnie McCormack <marnie@apache.org> | 2006-12-12 10:18:43 +0000 |
---|---|---|
committer | Marnie McCormack <marnie@apache.org> | 2006-12-12 10:18:43 +0000 |
commit | e55c6d1544ac87c1388ad1fe1fdd149c8b6fe48f (patch) | |
tree | ef02ba579c85dca5638991e874279649a0c03fb9 /java | |
parent | 9b45e0ab5d2a719d8ab4e7ec3f23d63ba4966f80 (diff) | |
download | qpid-python-e55c6d1544ac87c1388ad1fe1fdd149c8b6fe48f.tar.gz |
Re-introduced example classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486084 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
19 files changed, 1884 insertions, 0 deletions
diff --git a/java/client/example/pom.xml b/java/client/example/pom.xml new file mode 100644 index 0000000000..d02b09edd5 --- /dev/null +++ b/java/client/example/pom.xml @@ -0,0 +1,111 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-example</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid Example</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <amqj.logging.level>warn</amqj.logging.level> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </dependency> + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-filter-ssl</artifactId> + </dependency> + + <dependency> + <groupId>jmscts</groupId> + <artifactId>jmscts</artifactId> + <version>0.5-b2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>amqj.noAutoCreateVMBroker</name> + <value>true</value> + </property> + <property> + <name>amqj.logging.level</name> + <value>${amqj.logging.level}</value> + </property> + <property> + <name>log4j.configuration</name> + <value>file:///${basedir}/src/main/java/log4j.properties</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml new file mode 100644 index 0000000000..de64423a51 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml @@ -0,0 +1,45 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="FileAppender" class="org.apache.log4j.FileAppender"> + <param name="File" value="ams_messaging.log"/> + <param name="Append" value="false"/> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/> + </layout> + </appender> + + <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + </layout> + </appender> + + <root> + <priority value="debug"/> + <appender-ref ref="STDOUT"/> + <appender-ref ref="FileAppender"/> + </root> +</log4j:configuration>
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java new file mode 100644 index 0000000000..b199d41432 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +import java.io.File; + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + +import javax.jms.JMSException; + +/** + * Class that sends message files to the Publisher to distribute + * using files as input + * Must set properties for host in properties file or uses in vm broker + */ +public class FileMessageDispatcher { + + protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); + + protected static Publisher _publisher = null; + + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ + public static void main(String[] args) + { + + //Check command line args ok - must provide a path or file for us to dispatch + if (args.length == 0) + { + System.err.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); + } + else + { + try + { + //publish message(s) from file(s) to configured queue + publish(args[0]); + + //Move payload file(s) to archive location as no error + FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); + } + catch(Exception e) + { + //log error and exit + _logger.error("Error trying to dispatch message: " + e); + System.exit(1); + } + finally + { + //clean up before exiting + if (getPublisher() != null) + { + getPublisher().cleanup(); + } + } + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Finished dispatching message"); + } + + System.exit(0); + } + + /** + * Publish the content of a file or files from a directory as messages + * @param path - from main args + * @throws JMSException + * @throws MessageFactoryException - if cannot create message from file content + */ + public static void publish(String path) throws JMSException, MessageFactoryException + { + File tempFile = new File(path); + if (tempFile.isDirectory()) + { + //while more files in dir publish them + File[] files = tempFile.listFiles(); + + if (files == null || files.length == 0) + { + _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); + } + else + { + for (File file : files) + { + //Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); + + //Send the message generated from the payload using the _publisher + getPublisher().sendMessage(factory.createEventMessage()); + + } + } + } + else + { + //handle a single file + //Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); + + //Send the message generated from the payload using the _publisher + getPublisher().sendMessage(factory.createEventMessage()); + } + } + + /** + * Cleanup before exit + */ + public static void cleanup() + { + if (getPublisher() != null) + { + getPublisher().cleanup(); + } + } + + /** + * @return A Publisher instance + */ + private static Publisher getPublisher() + { + if (_publisher != null) + { + return _publisher; + } + + //Create a _publisher + _publisher = new Publisher(); + + return _publisher; + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java new file mode 100644 index 0000000000..88bcbbbccb --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.example.publisher; + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + +import java.io.*; +import javax.jms.*; + +public class FileMessageFactory +{ + protected final Session _session; + protected final String _payload; + protected final String _filename; + + /** + * Contructs and instance using a filename from which content will be used to create message + * @param session + * @param filename + * @throws MessageFactoryException + */ + public FileMessageFactory(Session session, String filename) throws MessageFactoryException + { + try + { + _filename = filename; + _payload = FileUtils.getFileContent(filename); + _session = session; + } + catch (IOException e) + { + throw new MessageFactoryException(e.toString()); + } + } + + /** + * Creates a text message and sets filename property on it + * The filename property is purely intended to provide visibility + * of file content passing trhough the broker using example classes + * @return Message - a TextMessage with content from file + * @throws JMSException + */ + public Message createEventMessage() throws JMSException + { + TextMessage msg = _session.createTextMessage(); + msg.setText(_payload); + msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName()); + return msg; + } + + /** + * Creates message from a string for use by the monitor + * @param session + * @param textMsg - message content + * @return Message - TextMessage with content from String + * @throws JMSException + */ + public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException + { + TextMessage msg = session.createTextMessage(); + msg.setText(textMsg); + return msg; + } + + public Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + public Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + public Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + public boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + public boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + public Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return e.toString(); + } + } + + private static boolean checkText(Message m, String s) + { + try + { + return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return false; + } + } +} + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java new file mode 100644 index 0000000000..34360d6708 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +public class MessageFactoryException extends Exception { + + private int _errorCode; + + public MessageFactoryException(String message) + { + super(message); + } + + public MessageFactoryException(String msg, Throwable t) + { + super(msg, t); + } + + public MessageFactoryException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public MessageFactoryException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public MessageFactoryException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public MessageFactoryException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public MessageFactoryException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java new file mode 100644 index 0000000000..8784d340da --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; +import org.apache.log4j.BasicConfigurator; + +import javax.jms.*; + +import java.util.Properties; + +/** + * Class that sends heartbeat messages to allow monitoring of message consumption + * Sends regular (currently 20 seconds apart) heartbeat message + */ +public class MonitorMessageDispatcher { + + private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); + + protected static MonitorPublisher _monitorPublisher = null; + + protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + + /** + * Easy entry point for running a message dispatcher for monitoring consumption + * @param args + */ + public static void main(String[] args) + { + + //Switch on logging appropriately for your app + BasicConfigurator.configure(); + + try + { + while(true) + { + try + { + //endlessly publish messages to monitor queue + publish(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Dispatched monitor message"); + } + + //sleep for twenty seconds and then publish again - change if appropriate + Thread.sleep(20000); + } + catch(UndeliveredMessageException a) + { + //trigger application specific failure handling here + _logger.error("Problem delivering monitor message"); + break; + } + } + } + catch(Exception e) + { + _logger.error("Error trying to dispatch AMS monitor message: " + e); + System.exit(1); + } + finally + { + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + } + + System.exit(1); + } + + /** + * Publish heartbeat message + * @throws JMSException + * @throws UndeliveredMessageException + */ + public static void publish() throws JMSException, UndeliveredMessageException + { + //Send the message generated from the payload using the _publisher + getMonitorPublisher().sendImmediateMessage + (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + } + + /** + * Cleanup publishers + */ + public static void cleanup() + { + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + } + + //Returns a _publisher for the monitor queue + private static MonitorPublisher getMonitorPublisher() + { + if (_monitorPublisher != null) + { + return _monitorPublisher; + } + + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); + + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java new file mode 100644 index 0000000000..233c3fea0a --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import javax.jms.Message; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.log4j.Logger; + +/** + * Subclass of Publisher which uses QPID functionality to send a heartbeat message + * Note immediate flag not available via JMS MessageProducer + */ +public class MonitorPublisher extends Publisher +{ + + private static final Logger _log = Logger.getLogger(Publisher.class); + + BasicMessageProducer _producer; + + public MonitorPublisher() + { + super(); + } + + /* + * Publishes a non-persistent message using transacted session + */ + public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer)_session.createProducer(_destination); + + //Send message via our producer which is not persistent and is immediate + //NB: not available via jms interface MessageProducer + _producer.send(message, DeliveryMode.NON_PERSISTENT, true); + + //commit the message send and close the transaction + _session.commit(); + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error(e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message",e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java new file mode 100644 index 0000000000..be42e0e413 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnectionFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.DeliveryMode; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Connection; +import javax.jms.Session; + +import javax.naming.InitialContext; + +import org.apache.qpid.example.shared.InitialContextHelper; + +public class Publisher +{ + private static final Logger _log = Logger.getLogger(Publisher.class); + + protected InitialContextHelper _contextHelper; + + protected Connection _connection; + + protected Session _session; + + protected MessageProducer _producer; + + protected String _destinationDir; + + protected String _name = "Publisher"; + + protected Queue _destination; + + protected static final String _defaultDestinationDir = "/tmp"; + + /** + * Creates a Publisher instance using properties from example.properties + * See InitialContextHelper for details of how context etc created + */ + public Publisher() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); + _connection = cf.createConnection(); + + //create a transactional session + _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //lookup the example queue and use it + //Queue is non-exclusive and not deleted when last consumer detaches + _destination = _session.createQueue((String)ctx.lookup("MyQueue")); + + //create a message producer + _producer = _session.createProducer(_destination); + + //set destination dir for files that have been processed + _destinationDir = _defaultDestinationDir; + + _connection.start(); + } + catch (Exception e) + { + e.printStackTrace(); + _log.error(e); + } + } + + /** + * Publishes a non-persistent message using transacted session + * Note that persistent is the default mode for send - so need to specify for transient + */ + public boolean sendMessage(Message message) + { + try + { + //Send message via our producer which is not persistent + _producer.send(message, DeliveryMode.NON_PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); + + //commit the message send and close the transaction + _session.commit(); + + } + catch (JMSException e) + { + //Have to assume our commit failed and rollback here + try + { + _session.rollback(); + _log.error(e); + e.printStackTrace(); + return false; + } + catch (JMSException j) + { + _log.error("Unable to rollback publish transaction ",e); + return false; + } + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /** + * Cleanup resources before exit + */ + public void cleanup() + { + try + { + if (_connection != null) + { + _connection.stop(); + _connection.close(); + } + _connection = null; + _producer = null; + } + catch(Exception e) + { + _log.error("Error trying to cleanup publisher " + e); + System.exit(1); + } + } + + /** + * Exposes session + * @return Session + */ + public Session getSession() + { + return _session; + } + + public String getDestinationDir() + { + return _destinationDir; + } + + public void setDestinationDir(String destinationDir) + { + _destinationDir = destinationDir; + } + + public String getName() + { + return _name; + } + + public void setName(String _name) { + this._name = _name; + } +} + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java new file mode 100644 index 0000000000..3335833c2d --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +/** + * Exception thrown by monitor when cannot send a message marked for immediate delivery + */ +public class UndeliveredMessageException extends Exception { + + private int _errorCode; + + public UndeliveredMessageException(String message) + { + super(message); + } + + public UndeliveredMessageException(String msg, Throwable t) + { + super(msg, t); + } + + public UndeliveredMessageException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public UndeliveredMessageException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public UndeliveredMessageException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public UndeliveredMessageException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public UndeliveredMessageException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java new file mode 100644 index 0000000000..8723983862 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +public class ConnectionException extends Exception { + + private int _errorCode; + + public ConnectionException(String message) + { + super(message); + } + + public ConnectionException(String msg, Throwable t) + { + super(msg, t); + } + + public ConnectionException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public ConnectionException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public ConnectionException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public ConnectionException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public ConnectionException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java new file mode 100644 index 0000000000..787cecd541 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +public class ContextException extends Exception { + + private int _errorCode; + + public ContextException(String message) + { + super(message); + } + + public ContextException(String msg, Throwable t) + { + super(msg, t); + } + + public ContextException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public ContextException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public ContextException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public ContextException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public ContextException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java new file mode 100644 index 0000000000..54446cb6a7 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +import java.io.*; + +/** + * Class that provides file related utility methods for utility use + */ +public class FileUtils { + + + //Reads file content into String + public static String getFileContent(String filePath) throws IOException + { + + BufferedReader reader = null; + String tempData = ""; + String eol = "\n\r"; + + try + { + String line; + reader = new BufferedReader(new FileReader(filePath)); + while ((line = reader.readLine()) != null) + { + if (!tempData.equals("")) + { + tempData = tempData + eol + line; + } + else + { + tempData = line; + } + } + } + finally + { + if (reader != null) + { + reader.close(); + } + } + return tempData; + } + + /* + * Reads xml from a file and returns it as an array of chars + */ + public static char[] getFileAsCharArray(String filePath) throws IOException + { + BufferedReader reader = null; + char[] tempChars = null; + String tempData = ""; + + try + { + String line; + reader = new BufferedReader(new FileReader(filePath)); + while ((line = reader.readLine()) != null) + { + tempData = tempData + line; + } + tempChars = tempData.toCharArray(); + } + finally + { + if (reader != null) + { + reader.close(); + } + } + return tempChars; + } + + /* + * Write String content to filename provided + */ + public static void writeStringToFile(String content, String path) throws IOException + { + + BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path))); + writer.write(content); + writer.flush(); + writer.close(); + } + + /* + * Allows moving of files to a new dir and preserves the last bit of the name only + */ + public static void moveFileToNewDir(String path, String newDir) throws IOException + { + //get file name from current path + //while more files in dir publish them + File pathFile = new File(path); + if (pathFile.isDirectory()) + { + File[] files = pathFile.listFiles(); + for (File file : files) + { + moveFileToNewDir(file,newDir); + } + } + } + + /* + * Allows moving of a file to a new dir and preserves the last bit of the name only + */ + public static void moveFileToNewDir(File fileToMove, String newDir) throws IOException + { + moveFile(fileToMove,getArchiveFileName(fileToMove,newDir)); + } + + /* + * Moves file from a given path to a new path with String params + */ + public static void moveFile(String fromPath, String dest) throws IOException + { + moveFile(new File(fromPath),new File(dest)); + } + + /* + * Moves file from a given path to a new path with mixed params + */ + public static void moveFile(File fileToMove, String dest) throws IOException + { + moveFile(fileToMove,new File(dest)); + } + + /* + * Moves file from a given path to a new path with File params + */ + public static void moveFile(File fileToMove, File dest) throws IOException + { + fileToMove.renameTo(dest); + } + + /* + * Deletes a given file + */ + public static void deleteFile(String filePath) throws IOException + { + new File(filePath).delete(); + } + + private static String getArchiveFileName(File fileToMove, String archiveDir) + { + //get file name from current path + String fileName = fileToMove.getName(); + return archiveDir + File.separator + fileName; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java new file mode 100644 index 0000000000..b39892b688 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.io.InputStream; +import java.io.IOException; + +/** + * Class that provides helper methods for JNDI + */ +public class InitialContextHelper { + + public static final String _defaultPropertiesName = "example.properties"; + protected static Properties _fileProperties; + protected static InitialContext _initialContext; + protected static final Logger _log = Logger.getLogger(InitialContextHelper.class); + + public InitialContextHelper(String propertiesName) throws ContextException + { + try + { + if (propertiesName == null || propertiesName.length() == 0) + { + propertiesName = _defaultPropertiesName; + } + + _fileProperties = new Properties(); + ClassLoader cl = this.getClass().getClassLoader(); + + //NB: Need to change path to reflect package if moving classes around ! + InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName); + _fileProperties.load(is); + _initialContext = new InitialContext(_fileProperties); + } + catch (IOException e) + { + throw new ContextException(_log, e.toString()); + } + catch (NamingException n) + { + throw new ContextException(_log, n.toString()); + } + } + + public Properties getFileProperties() + { + return _fileProperties; + } + + public InitialContext getInitialContext() + { + return _initialContext; + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java new file mode 100644 index 0000000000..c056f8a7da --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +/** + * Constants used by AMS Publisher/Subscriber classes + */ +public class Statics { + + public static final String TOPIC_NAME = "EXAMPLE_TOPIC"; + + public static final String QUEUE_NAME = "EXAMPLE_QUEUE"; + + public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR"; + + public static final String HOST_PROPERTY = "host"; + + public static final String PORT_PROPERTY = "port"; + + public static final String USER_PROPERTY = "user"; + + public static final String PWD_PROPERTY = "pwd"; + + public static final String TOPIC_PROPERTY = "topic"; + + public static final String QUEUE_PROPERTY = "queue"; + + public static final String VIRTUAL_PATH_PROPERTY = "virtualpath"; + + public static final String ARCHIVE_PATH = "archivepath"; + + public static final String CLIENT_PROPERTY = "client"; + + public static final String FILENAME_PROPERTY = "filename"; + + public static final String DEFAULT_USER = "guest"; + + public static final String DEFAULT_PWD = "guest"; + + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties new file mode 100644 index 0000000000..82de41908f --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties @@ -0,0 +1,21 @@ +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java new file mode 100644 index 0000000000..9c195aef40 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.Logger; +import org.apache.qpid.example.shared.Statics; + +import javax.jms.*; + +/** + * Subclass of Subscriber which consumes a heartbeat message + */ + +public class MonitoredSubscriber extends Subscriber +{ + protected String _monitorDestinationName; + + private static final Logger _logger = Logger.getLogger(MonitoredSubscriber.class); + + private static MessageConsumer _monitorConsumer; + + public MonitoredSubscriber() + { + super(); + //lookup queue name and append suffix + _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX; + } + + /** + * MessageListener implementation for this subscriber + */ + public static class MonitorMessageListener implements MessageListener + { + private String _name; + + public MonitorMessageListener(String name) + { + _name = name; + + } + + /** + * Listens for heartbeat messages and acknowledges them + * @param message + */ + public void onMessage(javax.jms.Message message) + { + _logger.info(_name + " monitor got message '" + message + "'"); + + try + { + _logger.debug("Monitor acknowledging recieved message"); + + //Now acknowledge the message to clear it from our queue + message.acknowledge(); + } + catch(JMSException j) + { + _logger.error("Monitor caught JMSException trying to acknowledge message receipt"); + j.printStackTrace(); + } + catch(Exception e) + { + _logger.error("Monitor caught unexpected exception trying to handle message"); + e.printStackTrace(); + } + } + } + + /** + * Subscribes to Queue and attaches additional monitor listener + */ + public void subscribeAndMonitor() + { + try + { + _connection = _connectionFactory.createConnection(); + + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_monitorDestinationName); + + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _monitorConsumer = session.createConsumer(destination); + + //give the monitor message listener a name of it's own + _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener + ("MonitorListener " + System.currentTimeMillis())); + + MonitoredSubscriber._logger.info("Starting monitored subscription ..."); + + MonitoredSubscriber._connection.start(); + + //and now start ordinary consumption too + subscribe(); + } + catch (Throwable t) + { + _logger.error("Fatal error: " + t); + t.printStackTrace(); + } + } + + /** + * Stop consuming + */ + public void stopMonitor() + { + try + { + _monitorConsumer.close(); + _monitorConsumer = null; + stop(); + } + catch(JMSException j) + { + _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java new file mode 100644 index 0000000000..d2f27da052 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.BasicConfigurator; + +/** + * Allows you to simply start a monitored subscriber + */ +public class MonitoredSubscriptionWrapper { + + private static MonitoredSubscriber _subscriber; + + /** + * Create a monitored subscriber and start it + * @param args - no params required + */ + public static void main(String args[]) + { + //switch on logging + BasicConfigurator.configure(); + + _subscriber = new MonitoredSubscriber(); + + _subscriber.subscribe(); + } + + /** + * Stop subscribing now ... + */ + public static void stop() + { + _subscriber.stop(); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java new file mode 100644 index 0000000000..34c7d6c7bb --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnectionFactory; + +import javax.jms.*; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.InitialContext; + +import org.apache.qpid.example.shared.InitialContextHelper; + +/** + * Subscriber which consumes messages from a queue + */ + +public class Subscriber +{ + private static final Logger _log = Logger.getLogger(Subscriber.class); + + protected static Connection _connection; + + protected static MessageConsumer _consumer; + + protected static InitialContextHelper _contextHelper; + + protected static AMQConnectionFactory _connectionFactory; + + protected String _destinationName; + + public Subscriber() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); + + //lookup queue name + _destinationName = (String) ctx.lookup("MyQueue"); + + } + catch (Exception e) + { + e.printStackTrace(); + _log.error(e); + } + } + + /** + * Listener class that handles messages + */ + public static class ExampleMessageListener implements MessageListener + { + private String _name; + + public ExampleMessageListener(String name) + { + _name = name; + + } + + /** + * Listens for message callbacks, handles and then acknowledges them + * @param message - the message received + */ + public void onMessage(javax.jms.Message message) + { + _log.info(_name + " got message '" + message + "'"); + + try + { + //NB: Handle your message appropriately for your application here + //do some stuff + + _log.debug("Acknowledging recieved message"); + + //Now acknowledge the message to clear it from our queue + message.acknowledge(); + } + catch(JMSException j) + { + _log.error("JMSException trying to acknowledge message receipt"); + j.printStackTrace(); + } + catch(Exception e) + { + _log.error("Unexpected exception trying to handle message"); + e.printStackTrace(); + } + } + } + + /** + * Subscribes to example Queue and attaches listener + */ + public void subscribe() + { + _log.info("Starting subscription ..."); + + try + { + _connection = _connectionFactory.createConnection(); + + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_destinationName); + + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _consumer = session.createConsumer(destination); + + //give the message listener a name of it's own + _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); + + _connection.start(); + } + catch (Throwable t) + { + _log.error("Fatal error: " + t); + t.printStackTrace(); + } + + _log.info("Waiting for messages ..."); + + //wait for messages and sleep to survive failover + try + { + while(true) + { + Thread.sleep(Long.MAX_VALUE); + } + } + catch (Exception e) + { + _log.warn("Exception while Subscriber sleeping",e); + } + } + + /** + * Set destination (queue or topic) name + * @param name + */ + public void setDestinationName(String name) + { + _destinationName = name; + } + + /** + * Stop consuming and close connection + */ + public void stop() + { + try + { + _consumer.close(); + _consumer = null; + _connection.stop(); + _connection.close(); + } + catch(JMSException j) + { + _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); + } + } + +} + + + + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java new file mode 100644 index 0000000000..32a0ef685c --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.BasicConfigurator; + +/** + * Allows you to simply start a subscriber + */ +public class SubscriptionWrapper { + + private static Subscriber _subscriber; + + /** + * Create a subscriber and start it + * @param args + */ + public static void main(String args[]) + { + //switch on logging + BasicConfigurator.configure(); + + _subscriber = new Subscriber(); + + _subscriber.subscribe(); + } + + /** + * Stop subscribing now ... + */ + public static void stop() + { + _subscriber.stop(); + } +} |