From 6d5154b0d5ad1fb32aadda06801c5dc8fdc958eb Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 21 Dec 2006 14:24:38 +0000 Subject: Merge from trunk up to rev 486165 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@489368 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/HeadersExchange.java | 13 +- java/client/example/pom.xml | 94 ++++ .../main/java/org/apache/qpid/example/log4j.xml | 45 ++ .../example/publisher/FileMessageDispatcher.java | 159 ++++++ .../qpid/example/publisher/FileMessageFactory.java | 134 +++++ .../example/publisher/MessageFactoryException.java | 72 +++ .../publisher/MonitorMessageDispatcher.java | 134 +++++ .../qpid/example/publisher/MonitorPublisher.java | 71 +++ .../apache/qpid/example/publisher/Publisher.java | 181 ++++++ .../publisher/UndeliveredMessageException.java | 74 +++ .../qpid/example/shared/ConnectionException.java | 71 +++ .../qpid/example/shared/ContextException.java | 73 +++ .../org/apache/qpid/example/shared/FileUtils.java | 168 ++++++ .../qpid/example/shared/InitialContextHelper.java | 78 +++ .../org/apache/qpid/example/shared/Statics.java | 57 ++ .../apache/qpid/example/shared/example.properties | 21 + .../example/subscriber/MonitoredSubscriber.java | 139 +++++ .../subscriber/MonitoredSubscriptionWrapper.java | 51 ++ .../apache/qpid/example/subscriber/Subscriber.java | 181 ++++++ .../example/subscriber/SubscriptionWrapper.java | 51 ++ .../qpid/client/message/AbstractBytesMessage.java | 19 +- .../qpid/client/message/AbstractJMSMessage.java | 21 +- .../qpid/client/message/JMSStreamMessage.java | 611 +++++++++++++-------- .../IBMPerfTest/JNDIBindConnectionFactory.java | 8 +- .../org/apache/qpid/IBMPerfTest/JNDIBindQueue.java | 21 +- .../org/apache/qpid/IBMPerfTest/JNDIBindTopic.java | 21 +- .../java/org/apache/qpid/example/log4j.xml | 45 -- .../example/publisher/FileMessageDispatcher.java | 159 ------ .../qpid/example/publisher/FileMessageFactory.java | 134 ----- .../example/publisher/MessageFactoryException.java | 72 --- .../publisher/MonitorMessageDispatcher.java | 134 ----- .../qpid/example/publisher/MonitorPublisher.java | 71 --- .../apache/qpid/example/publisher/Publisher.java | 181 ------ .../publisher/UndeliveredMessageException.java | 74 --- .../qpid/example/shared/ConnectionException.java | 71 --- .../qpid/example/shared/ContextException.java | 73 --- .../org/apache/qpid/example/shared/FileUtils.java | 168 ------ .../qpid/example/shared/InitialContextHelper.java | 78 --- .../org/apache/qpid/example/shared/Statics.java | 57 -- .../apache/qpid/example/shared/example.properties | 21 - .../example/subscriber/MonitoredSubscriber.java | 139 ----- .../subscriber/MonitoredSubscriptionWrapper.java | 51 -- .../apache/qpid/example/subscriber/Subscriber.java | 194 ------- .../example/subscriber/SubscriptionWrapper.java | 51 -- .../unit/client/message/StreamMessageTest.java | 57 +- .../exchange/AbstractHeadersExchangeTestBase.java | 42 +- .../qpid/server/exchange/HeadersExchangeTest.java | 28 + .../ReturnUnroutableMandatoryMessageTest.java | 151 +++++ 48 files changed, 2526 insertions(+), 2093 deletions(-) create mode 100644 java/client/example/pom.xml create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/log4j.xml create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java create mode 100644 java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/log4j.xml delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java delete mode 100644 java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 818e5b04b2..dbf4ef91db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -206,7 +206,18 @@ public class HeadersExchange extends AbstractExchange } if (!routed) { - _logger.warn("Exchange " + getName() + ": message not routable."); + + String msg = "Exchange " + getName() + ": message not routable."; + + if (payload.getPublishBody().mandatory) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } } diff --git a/java/client/example/pom.xml b/java/client/example/pom.xml new file mode 100644 index 0000000000..ac0081c00b --- /dev/null +++ b/java/client/example/pom.xml @@ -0,0 +1,94 @@ + + + 4.0.0 + org.apache.qpid + qpid-example + jar + 1.0-incubating-M2-SNAPSHOT + Qpid Example + http://cwiki.apache.org/confluence/display/qpid + + + org.apache.qpid + qpid + 1.0-incubating-M2-SNAPSHOT + + + + .. + warn + + + + + org.apache.qpid + qpid-common + + + org.apache.qpid + qpid-client + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + + + jmscts + jmscts + 0.5-b2 + test + + + jms + jms + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + amqj.noAutoCreateVMBroker + true + + + amqj.logging.level + ${amqj.logging.level} + + + log4j.configuration + file:///${basedir}/src/main/java/log4j.properties + + + + + + + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml new file mode 100644 index 0000000000..de64423a51 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java new file mode 100644 index 0000000000..b199d41432 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +import java.io.File; + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + +import javax.jms.JMSException; + +/** + * Class that sends message files to the Publisher to distribute + * using files as input + * Must set properties for host in properties file or uses in vm broker + */ +public class FileMessageDispatcher { + + protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); + + protected static Publisher _publisher = null; + + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ + public static void main(String[] args) + { + + //Check command line args ok - must provide a path or file for us to dispatch + if (args.length == 0) + { + System.err.println("Usage: FileMessageDispatcher " + ""); + } + else + { + try + { + //publish message(s) from file(s) to configured queue + publish(args[0]); + + //Move payload file(s) to archive location as no error + FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); + } + catch(Exception e) + { + //log error and exit + _logger.error("Error trying to dispatch message: " + e); + System.exit(1); + } + finally + { + //clean up before exiting + if (getPublisher() != null) + { + getPublisher().cleanup(); + } + } + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Finished dispatching message"); + } + + System.exit(0); + } + + /** + * Publish the content of a file or files from a directory as messages + * @param path - from main args + * @throws JMSException + * @throws MessageFactoryException - if cannot create message from file content + */ + public static void publish(String path) throws JMSException, MessageFactoryException + { + File tempFile = new File(path); + if (tempFile.isDirectory()) + { + //while more files in dir publish them + File[] files = tempFile.listFiles(); + + if (files == null || files.length == 0) + { + _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); + } + else + { + for (File file : files) + { + //Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); + + //Send the message generated from the payload using the _publisher + getPublisher().sendMessage(factory.createEventMessage()); + + } + } + } + else + { + //handle a single file + //Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); + + //Send the message generated from the payload using the _publisher + getPublisher().sendMessage(factory.createEventMessage()); + } + } + + /** + * Cleanup before exit + */ + public static void cleanup() + { + if (getPublisher() != null) + { + getPublisher().cleanup(); + } + } + + /** + * @return A Publisher instance + */ + private static Publisher getPublisher() + { + if (_publisher != null) + { + return _publisher; + } + + //Create a _publisher + _publisher = new Publisher(); + + return _publisher; + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java new file mode 100644 index 0000000000..88bcbbbccb --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.example.publisher; + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + +import java.io.*; +import javax.jms.*; + +public class FileMessageFactory +{ + protected final Session _session; + protected final String _payload; + protected final String _filename; + + /** + * Contructs and instance using a filename from which content will be used to create message + * @param session + * @param filename + * @throws MessageFactoryException + */ + public FileMessageFactory(Session session, String filename) throws MessageFactoryException + { + try + { + _filename = filename; + _payload = FileUtils.getFileContent(filename); + _session = session; + } + catch (IOException e) + { + throw new MessageFactoryException(e.toString()); + } + } + + /** + * Creates a text message and sets filename property on it + * The filename property is purely intended to provide visibility + * of file content passing trhough the broker using example classes + * @return Message - a TextMessage with content from file + * @throws JMSException + */ + public Message createEventMessage() throws JMSException + { + TextMessage msg = _session.createTextMessage(); + msg.setText(_payload); + msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName()); + return msg; + } + + /** + * Creates message from a string for use by the monitor + * @param session + * @param textMsg - message content + * @return Message - TextMessage with content from String + * @throws JMSException + */ + public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException + { + TextMessage msg = session.createTextMessage(); + msg.setText(textMsg); + return msg; + } + + public Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + public Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + public Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + public boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + public boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + public Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return e.toString(); + } + } + + private static boolean checkText(Message m, String s) + { + try + { + return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return false; + } + } +} + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java new file mode 100644 index 0000000000..34360d6708 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +public class MessageFactoryException extends Exception { + + private int _errorCode; + + public MessageFactoryException(String message) + { + super(message); + } + + public MessageFactoryException(String msg, Throwable t) + { + super(msg, t); + } + + public MessageFactoryException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public MessageFactoryException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public MessageFactoryException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public MessageFactoryException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public MessageFactoryException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java new file mode 100644 index 0000000000..8784d340da --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; +import org.apache.log4j.BasicConfigurator; + +import javax.jms.*; + +import java.util.Properties; + +/** + * Class that sends heartbeat messages to allow monitoring of message consumption + * Sends regular (currently 20 seconds apart) heartbeat message + */ +public class MonitorMessageDispatcher { + + private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); + + protected static MonitorPublisher _monitorPublisher = null; + + protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + + /** + * Easy entry point for running a message dispatcher for monitoring consumption + * @param args + */ + public static void main(String[] args) + { + + //Switch on logging appropriately for your app + BasicConfigurator.configure(); + + try + { + while(true) + { + try + { + //endlessly publish messages to monitor queue + publish(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Dispatched monitor message"); + } + + //sleep for twenty seconds and then publish again - change if appropriate + Thread.sleep(20000); + } + catch(UndeliveredMessageException a) + { + //trigger application specific failure handling here + _logger.error("Problem delivering monitor message"); + break; + } + } + } + catch(Exception e) + { + _logger.error("Error trying to dispatch AMS monitor message: " + e); + System.exit(1); + } + finally + { + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + } + + System.exit(1); + } + + /** + * Publish heartbeat message + * @throws JMSException + * @throws UndeliveredMessageException + */ + public static void publish() throws JMSException, UndeliveredMessageException + { + //Send the message generated from the payload using the _publisher + getMonitorPublisher().sendImmediateMessage + (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + } + + /** + * Cleanup publishers + */ + public static void cleanup() + { + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + } + + //Returns a _publisher for the monitor queue + private static MonitorPublisher getMonitorPublisher() + { + if (_monitorPublisher != null) + { + return _monitorPublisher; + } + + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); + + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java new file mode 100644 index 0000000000..233c3fea0a --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import javax.jms.Message; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.log4j.Logger; + +/** + * Subclass of Publisher which uses QPID functionality to send a heartbeat message + * Note immediate flag not available via JMS MessageProducer + */ +public class MonitorPublisher extends Publisher +{ + + private static final Logger _log = Logger.getLogger(Publisher.class); + + BasicMessageProducer _producer; + + public MonitorPublisher() + { + super(); + } + + /* + * Publishes a non-persistent message using transacted session + */ + public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer)_session.createProducer(_destination); + + //Send message via our producer which is not persistent and is immediate + //NB: not available via jms interface MessageProducer + _producer.send(message, DeliveryMode.NON_PERSISTENT, true); + + //commit the message send and close the transaction + _session.commit(); + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error(e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message",e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java new file mode 100644 index 0000000000..2bde4ec35c --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnectionFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.DeliveryMode; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Connection; +import javax.jms.Session; + +import javax.naming.InitialContext; + +import org.apache.qpid.example.shared.InitialContextHelper; + +public class Publisher +{ + private static final Logger _log = Logger.getLogger(Publisher.class); + + protected InitialContextHelper _contextHelper; + + protected Connection _connection; + + protected Session _session; + + protected MessageProducer _producer; + + protected String _destinationDir; + + protected String _name = "Publisher"; + + protected Queue _destination; + + protected static final String _defaultDestinationDir = "/tmp"; + + /** + * Creates a Publisher instance using properties from example.properties + * See InitialContextHelper for details of how context etc created + */ + public Publisher() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); + _connection = cf.createConnection(); + + //create a transactional session + _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //lookup the example queue and use it + //Queue is non-exclusive and not deleted when last consumer detaches + _destination = (Queue) ctx.lookup("MyQueue"); + + //create a message producer + _producer = _session.createProducer(_destination); + + //set destination dir for files that have been processed + _destinationDir = _defaultDestinationDir; + + _connection.start(); + } + catch (Exception e) + { + e.printStackTrace(); + _log.error(e); + } + } + + /** + * Publishes a non-persistent message using transacted session + * Note that persistent is the default mode for send - so need to specify for transient + */ + public boolean sendMessage(Message message) + { + try + { + //Send message via our producer which is not persistent + _producer.send(message, DeliveryMode.NON_PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); + + //commit the message send and close the transaction + _session.commit(); + + } + catch (JMSException e) + { + //Have to assume our commit failed and rollback here + try + { + _session.rollback(); + _log.error(e); + e.printStackTrace(); + return false; + } + catch (JMSException j) + { + _log.error("Unable to rollback publish transaction ",e); + return false; + } + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /** + * Cleanup resources before exit + */ + public void cleanup() + { + try + { + if (_connection != null) + { + _connection.stop(); + _connection.close(); + } + _connection = null; + _producer = null; + } + catch(Exception e) + { + _log.error("Error trying to cleanup publisher " + e); + System.exit(1); + } + } + + /** + * Exposes session + * @return Session + */ + public Session getSession() + { + return _session; + } + + public String getDestinationDir() + { + return _destinationDir; + } + + public void setDestinationDir(String destinationDir) + { + _destinationDir = destinationDir; + } + + public String getName() + { + return _name; + } + + public void setName(String _name) { + this._name = _name; + } +} + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java new file mode 100644 index 0000000000..3335833c2d --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.log4j.Logger; + +/** + * Exception thrown by monitor when cannot send a message marked for immediate delivery + */ +public class UndeliveredMessageException extends Exception { + + private int _errorCode; + + public UndeliveredMessageException(String message) + { + super(message); + } + + public UndeliveredMessageException(String msg, Throwable t) + { + super(msg, t); + } + + public UndeliveredMessageException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public UndeliveredMessageException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public UndeliveredMessageException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public UndeliveredMessageException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public UndeliveredMessageException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java new file mode 100644 index 0000000000..8723983862 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +public class ConnectionException extends Exception { + + private int _errorCode; + + public ConnectionException(String message) + { + super(message); + } + + public ConnectionException(String msg, Throwable t) + { + super(msg, t); + } + + public ConnectionException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public ConnectionException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public ConnectionException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public ConnectionException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public ConnectionException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java new file mode 100644 index 0000000000..787cecd541 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +public class ContextException extends Exception { + + private int _errorCode; + + public ContextException(String message) + { + super(message); + } + + public ContextException(String msg, Throwable t) + { + super(msg, t); + } + + public ContextException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public ContextException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public ContextException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public ContextException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public ContextException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java new file mode 100644 index 0000000000..54446cb6a7 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +import java.io.*; + +/** + * Class that provides file related utility methods for utility use + */ +public class FileUtils { + + + //Reads file content into String + public static String getFileContent(String filePath) throws IOException + { + + BufferedReader reader = null; + String tempData = ""; + String eol = "\n\r"; + + try + { + String line; + reader = new BufferedReader(new FileReader(filePath)); + while ((line = reader.readLine()) != null) + { + if (!tempData.equals("")) + { + tempData = tempData + eol + line; + } + else + { + tempData = line; + } + } + } + finally + { + if (reader != null) + { + reader.close(); + } + } + return tempData; + } + + /* + * Reads xml from a file and returns it as an array of chars + */ + public static char[] getFileAsCharArray(String filePath) throws IOException + { + BufferedReader reader = null; + char[] tempChars = null; + String tempData = ""; + + try + { + String line; + reader = new BufferedReader(new FileReader(filePath)); + while ((line = reader.readLine()) != null) + { + tempData = tempData + line; + } + tempChars = tempData.toCharArray(); + } + finally + { + if (reader != null) + { + reader.close(); + } + } + return tempChars; + } + + /* + * Write String content to filename provided + */ + public static void writeStringToFile(String content, String path) throws IOException + { + + BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path))); + writer.write(content); + writer.flush(); + writer.close(); + } + + /* + * Allows moving of files to a new dir and preserves the last bit of the name only + */ + public static void moveFileToNewDir(String path, String newDir) throws IOException + { + //get file name from current path + //while more files in dir publish them + File pathFile = new File(path); + if (pathFile.isDirectory()) + { + File[] files = pathFile.listFiles(); + for (File file : files) + { + moveFileToNewDir(file,newDir); + } + } + } + + /* + * Allows moving of a file to a new dir and preserves the last bit of the name only + */ + public static void moveFileToNewDir(File fileToMove, String newDir) throws IOException + { + moveFile(fileToMove,getArchiveFileName(fileToMove,newDir)); + } + + /* + * Moves file from a given path to a new path with String params + */ + public static void moveFile(String fromPath, String dest) throws IOException + { + moveFile(new File(fromPath),new File(dest)); + } + + /* + * Moves file from a given path to a new path with mixed params + */ + public static void moveFile(File fileToMove, String dest) throws IOException + { + moveFile(fileToMove,new File(dest)); + } + + /* + * Moves file from a given path to a new path with File params + */ + public static void moveFile(File fileToMove, File dest) throws IOException + { + fileToMove.renameTo(dest); + } + + /* + * Deletes a given file + */ + public static void deleteFile(String filePath) throws IOException + { + new File(filePath).delete(); + } + + private static String getArchiveFileName(File fileToMove, String archiveDir) + { + //get file name from current path + String fileName = fileToMove.getName(); + return archiveDir + File.separator + fileName; + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java new file mode 100644 index 0000000000..b39892b688 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import org.apache.log4j.Logger; + +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.io.InputStream; +import java.io.IOException; + +/** + * Class that provides helper methods for JNDI + */ +public class InitialContextHelper { + + public static final String _defaultPropertiesName = "example.properties"; + protected static Properties _fileProperties; + protected static InitialContext _initialContext; + protected static final Logger _log = Logger.getLogger(InitialContextHelper.class); + + public InitialContextHelper(String propertiesName) throws ContextException + { + try + { + if (propertiesName == null || propertiesName.length() == 0) + { + propertiesName = _defaultPropertiesName; + } + + _fileProperties = new Properties(); + ClassLoader cl = this.getClass().getClassLoader(); + + //NB: Need to change path to reflect package if moving classes around ! + InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName); + _fileProperties.load(is); + _initialContext = new InitialContext(_fileProperties); + } + catch (IOException e) + { + throw new ContextException(_log, e.toString()); + } + catch (NamingException n) + { + throw new ContextException(_log, n.toString()); + } + } + + public Properties getFileProperties() + { + return _fileProperties; + } + + public InitialContext getInitialContext() + { + return _initialContext; + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java new file mode 100644 index 0000000000..c056f8a7da --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +/** + * Constants used by AMS Publisher/Subscriber classes + */ +public class Statics { + + public static final String TOPIC_NAME = "EXAMPLE_TOPIC"; + + public static final String QUEUE_NAME = "EXAMPLE_QUEUE"; + + public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR"; + + public static final String HOST_PROPERTY = "host"; + + public static final String PORT_PROPERTY = "port"; + + public static final String USER_PROPERTY = "user"; + + public static final String PWD_PROPERTY = "pwd"; + + public static final String TOPIC_PROPERTY = "topic"; + + public static final String QUEUE_PROPERTY = "queue"; + + public static final String VIRTUAL_PATH_PROPERTY = "virtualpath"; + + public static final String ARCHIVE_PATH = "archivepath"; + + public static final String CLIENT_PROPERTY = "client"; + + public static final String FILENAME_PROPERTY = "filename"; + + public static final String DEFAULT_USER = "guest"; + + public static final String DEFAULT_PWD = "guest"; + + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties new file mode 100644 index 0000000000..7f513341a2 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties @@ -0,0 +1,21 @@ +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java new file mode 100644 index 0000000000..1d2e5e0e66 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.Logger; +import org.apache.qpid.example.shared.Statics; + +import javax.jms.*; + +/** + * Subclass of Subscriber which consumes a heartbeat message + */ + +public class MonitoredSubscriber extends Subscriber +{ + protected String _monitorDestinationName; + + private static final Logger _logger = Logger.getLogger(MonitoredSubscriber.class); + + private static MessageConsumer _monitorConsumer; + + public MonitoredSubscriber() + { + super(); + //lookup queue name and append suffix + _monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX; + } + + /** + * MessageListener implementation for this subscriber + */ + public static class MonitorMessageListener implements MessageListener + { + private String _name; + + public MonitorMessageListener(String name) + { + _name = name; + + } + + /** + * Listens for heartbeat messages and acknowledges them + * @param message + */ + public void onMessage(javax.jms.Message message) + { + _logger.info(_name + " monitor got message '" + message + "'"); + + try + { + _logger.debug("Monitor acknowledging recieved message"); + + //Now acknowledge the message to clear it from our queue + message.acknowledge(); + } + catch(JMSException j) + { + _logger.error("Monitor caught JMSException trying to acknowledge message receipt"); + j.printStackTrace(); + } + catch(Exception e) + { + _logger.error("Monitor caught unexpected exception trying to handle message"); + e.printStackTrace(); + } + } + } + + /** + * Subscribes to Queue and attaches additional monitor listener + */ + public void subscribeAndMonitor() + { + try + { + _connection = _connectionFactory.createConnection(); + + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_monitorDestinationName); + + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _monitorConsumer = session.createConsumer(destination); + + //give the monitor message listener a name of it's own + _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener + ("MonitorListener " + System.currentTimeMillis())); + + MonitoredSubscriber._logger.info("Starting monitored subscription ..."); + + MonitoredSubscriber._connection.start(); + + //and now start ordinary consumption too + subscribe(); + } + catch (Throwable t) + { + _logger.error("Fatal error: " + t); + t.printStackTrace(); + } + } + + /** + * Stop consuming + */ + public void stopMonitor() + { + try + { + _monitorConsumer.close(); + _monitorConsumer = null; + stop(); + } + catch(JMSException j) + { + _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java new file mode 100644 index 0000000000..d2f27da052 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.BasicConfigurator; + +/** + * Allows you to simply start a monitored subscriber + */ +public class MonitoredSubscriptionWrapper { + + private static MonitoredSubscriber _subscriber; + + /** + * Create a monitored subscriber and start it + * @param args - no params required + */ + public static void main(String args[]) + { + //switch on logging + BasicConfigurator.configure(); + + _subscriber = new MonitoredSubscriber(); + + _subscriber.subscribe(); + } + + /** + * Stop subscribing now ... + */ + public static void stop() + { + _subscriber.stop(); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java new file mode 100644 index 0000000000..4e92a6c678 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnectionFactory; + +import javax.jms.*; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.InitialContext; + +import org.apache.qpid.example.shared.InitialContextHelper; + +/** + * Subscriber which consumes messages from a queue + */ + +public class Subscriber +{ + private static final Logger _log = Logger.getLogger(Subscriber.class); + + protected static Connection _connection; + + protected static MessageConsumer _consumer; + + protected static InitialContextHelper _contextHelper; + + protected static AMQConnectionFactory _connectionFactory; + + protected Destination _destination; + + public Subscriber() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); + + //lookup queue from context + _destination = (Destination) ctx.lookup("MyQueue"); + + } + catch (Exception e) + { + e.printStackTrace(); + _log.error(e); + } + } + + /** + * Listener class that handles messages + */ + public static class ExampleMessageListener implements MessageListener + { + private String _name; + + public ExampleMessageListener(String name) + { + _name = name; + } + + /** + * Listens for message callbacks, handles and then acknowledges them + * @param message - the message received + */ + public void onMessage(javax.jms.Message message) + { + _log.info(_name + " got message '" + message + "'"); + + try + { + //NB: Handle your message appropriately for your application here + //do some stuff + + _log.debug("Acknowledging recieved message"); + + //Now acknowledge the message to clear it from our queue + message.acknowledge(); + } + catch(JMSException j) + { + _log.error("JMSException trying to acknowledge message receipt"); + j.printStackTrace(); + } + catch(Exception e) + { + _log.error("Unexpected exception trying to handle message"); + e.printStackTrace(); + } + } + } + + /** + * Subscribes to example Queue and attaches listener + */ + public void subscribe() + { + _log.info("Starting subscription ..."); + + try + { + _connection = _connectionFactory.createConnection(); + + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _consumer = session.createConsumer(_destination); + + //give the message listener a name of it's own + _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); + + _connection.start(); + } + catch (Throwable t) + { + _log.error("Fatal error: " + t); + t.printStackTrace(); + } + + _log.info("Waiting for messages ..."); + + //wait for messages and sleep to survive failover + try + { + while(true) + { + Thread.sleep(Long.MAX_VALUE); + } + } + catch (Exception e) + { + _log.warn("Exception while Subscriber sleeping",e); + } + } + + /** + * Stop consuming and close connection + */ + public void stop() + { + try + { + _consumer.close(); + _consumer = null; + _connection.stop(); + _connection.close(); + } + catch(JMSException j) + { + _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); + } + } + +} + + + + diff --git a/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java new file mode 100644 index 0000000000..32a0ef685c --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.log4j.BasicConfigurator; + +/** + * Allows you to simply start a subscriber + */ +public class SubscriptionWrapper { + + private static Subscriber _subscriber; + + /** + * Create a subscriber and start it + * @param args + */ + public static void main(String args[]) + { + //switch on logging + BasicConfigurator.configure(); + + _subscriber = new Subscriber(); + + _subscriber.subscribe(); + } + + /** + * Stop subscribing now ... + */ + public static void stop() + { + _subscriber.stop(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 77b3bd7566..bec0686ce4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -21,23 +21,20 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; import javax.jms.MessageEOFException; -import javax.jms.MessageNotWriteableException; import java.io.IOException; import java.nio.charset.Charset; -import java.nio.charset.CharacterCodingException; /** * @author Apache Software Foundation */ public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ +{ /** * The default initial size of the buffer. The buffer expands automatically. @@ -79,7 +76,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { _data.clear(); } - + public String toBodyString() throws JMSException { checkReadable(); @@ -124,7 +121,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage return data; } } - + /** * Check that there is at least a certain number of bytes available to read * @@ -138,10 +135,4 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage throw new MessageEOFException("Unable to read " + len + " bytes"); } } - - public void reset() throws JMSException - { - super.reset(); - _data.flip(); - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 514287aea7..6bd4fd0297 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -384,15 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } public void acknowledge() throws JMSException - { + { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_session != null) { if (_session.getAMQConnection().isClosed()){ throw new javax.jms.IllegalStateException("Connection is already closed"); - } - + } + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session _session.acknowledgeMessage(_deliveryTag, true); @@ -546,7 +546,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public void reset() throws JMSException { - _readableMessage = true; + if (_readableMessage) + { + _data.rewind(); + } + else + { + _data.flip(); + _readableMessage = true; + } } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index ccb3c0bf57..04f3c5ee17 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -33,7 +33,7 @@ import java.nio.charset.Charset; */ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage { - public static final String MIME_TYPE="jms/stream-message"; + public static final String MIME_TYPE="jms/stream-message"; private static final byte BOOLEAN_TYPE = (byte) 1; @@ -55,6 +55,8 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess private static final byte STRING_TYPE = (byte) 10; + private static final byte NULL_STRING_TYPE = (byte) 11; + /** * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read * a byte array in multiple chunks, hence this is used to track how much is left to be read @@ -89,7 +91,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess return MIME_TYPE; } - private byte readAndCheckType() throws MessageFormatException, MessageEOFException, + private byte readWireType() throws MessageFormatException, MessageEOFException, MessageNotReadableException { checkReadable(); @@ -105,22 +107,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public boolean readBoolean() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); boolean result; - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private boolean readBooleanImpl() @@ -130,20 +142,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public byte readByte() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); byte result; - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } return result; } @@ -155,24 +177,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public short readShort() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); short result; - switch (wireType) + try { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } return result; } @@ -190,15 +222,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ public char readChar() throws JMSException { - byte wireType = readAndCheckType(); - if (wireType != CHAR_TYPE) + int position = _data.position(); + byte wireType = readWireType(); + try { - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } } - else + catch (RuntimeException e) { - checkAvailable(2); - return readCharImpl(); + _data.position(position); + throw e; } } @@ -209,30 +251,40 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public int readInt() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); int result; - switch (wireType) + try { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private int readIntImpl() @@ -242,34 +294,44 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public long readLong() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); long result; - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private long readLongImpl() @@ -279,22 +341,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public float readFloat() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); float result; - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private float readFloatImpl() @@ -304,26 +376,36 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public double readDouble() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); double result; - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private double readDoubleImpl() @@ -333,50 +415,63 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public String readString() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); String result; - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private String readStringImpl() throws JMSException @@ -406,7 +501,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess // type discriminator checked separately so you get a MessageFormatException rather than // an EOF even in the case where both would be applicable checkAvailable(1); - byte wireType = readAndCheckType(); + byte wireType = readWireType(); if (wireType != BYTEARRAY_TYPE) { throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); @@ -431,18 +526,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess } } } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } - return readBytesImpl(bytes); + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; } private int readBytesImpl(byte[] bytes) { int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); _byteArrayRemaining -= count; - if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - } + if (count == 0) { return 0; @@ -456,62 +558,74 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public Object readObject() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); Object result = null; - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + result = new byte[size]; + readBytesImpl(new byte[size]); + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: result = null; - } - else - { - _byteArrayRemaining = size; - result = new byte[size]; - readBytesImpl(new byte[size]); - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } public void writeBoolean(boolean b) throws JMSException @@ -564,18 +678,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeString(String string) throws JMSException { - writeTypeDiscriminator(STRING_TYPE); - try + if (string == null) { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte)0); + writeTypeDiscriminator(NULL_STRING_TYPE); } - catch (CharacterCodingException e) + else { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; + writeTypeDiscriminator(STRING_TYPE); + try + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } } } @@ -601,11 +722,17 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeObject(Object object) throws JMSException { checkWritable(); + Class clazz = null; if (object == null) { - throw new NullPointerException("Argument must not be null"); + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); } - Class clazz = object.getClass(); + if (clazz == Byte.class) { writeByte((Byte) object); diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java index 842b2d7696..2c08f1e34a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java +++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java @@ -36,7 +36,7 @@ public class JNDIBindConnectionFactory { public static final String CONNECTION_FACTORY_BINDING = "amq.ConnectionFactory"; - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI"; + public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI"; public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory"; public static final String DEFAULT_CONNECTION_URL = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; @@ -154,7 +154,7 @@ public class JNDIBindConnectionFactory } catch (NamingException e) { - + System.out.println("Operation failed: " + e); } // Perform the bind @@ -169,11 +169,11 @@ public class JNDIBindConnectionFactory } catch (NamingException amqe) { - + System.out.println("Operation failed: " + amqe); } catch (URLSyntaxException e) { - + System.out.println("Operation failed: " + e); } } diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java index 5f328a4107..07dc8c85b3 100644 --- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java +++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java @@ -20,24 +20,25 @@ */ package org.apache.qpid.IBMPerfTest; -import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import org.apache.qpid.client.AMQSession; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; -import javax.jms.*; -import java.util.Hashtable; import java.io.File; -import java.net.MalformedURLException; +import java.util.Hashtable; public class JNDIBindQueue -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI"; +{ + public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI"; public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory"; @@ -98,7 +99,7 @@ public class JNDIBindQueue } catch (JMSException closeE) { - + System.out.println("Connection closing failed: " + closeE); } } diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java index c31dce22cf..16bf0dcb7a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java +++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java @@ -20,24 +20,25 @@ */ package org.apache.qpid.IBMPerfTest; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; -import java.util.Hashtable; import java.io.File; -import java.net.MalformedURLException; +import java.util.Hashtable; public class JNDIBindTopic -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI"; +{ + public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI"; public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory"; @@ -99,11 +100,9 @@ public class JNDIBindTopic } catch (JMSException closeE) { - + System.out.println("Operation failed: " + closeE); } } - - } diff --git a/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml b/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml deleted file mode 100644 index de64423a51..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml +++ /dev/null @@ -1,45 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java deleted file mode 100644 index b199d41432..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.log4j.Logger; - -import java.io.File; - -import org.apache.qpid.example.shared.FileUtils; -import org.apache.qpid.example.shared.Statics; - -import javax.jms.JMSException; - -/** - * Class that sends message files to the Publisher to distribute - * using files as input - * Must set properties for host in properties file or uses in vm broker - */ -public class FileMessageDispatcher { - - protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); - - protected static Publisher _publisher = null; - - /** - * To use this main method you need to specify a path or file to use for input - * This class then uses file contents from the dir/file specified to generate - * messages to publish - * Intended to be a very simple way to get going with publishing using the broker - * @param args - must specify one value, the path to file(s) for publisher - */ - public static void main(String[] args) - { - - //Check command line args ok - must provide a path or file for us to dispatch - if (args.length == 0) - { - System.err.println("Usage: FileMessageDispatcher " + ""); - } - else - { - try - { - //publish message(s) from file(s) to configured queue - publish(args[0]); - - //Move payload file(s) to archive location as no error - FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); - } - catch(Exception e) - { - //log error and exit - _logger.error("Error trying to dispatch message: " + e); - System.exit(1); - } - finally - { - //clean up before exiting - if (getPublisher() != null) - { - getPublisher().cleanup(); - } - } - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Finished dispatching message"); - } - - System.exit(0); - } - - /** - * Publish the content of a file or files from a directory as messages - * @param path - from main args - * @throws JMSException - * @throws MessageFactoryException - if cannot create message from file content - */ - public static void publish(String path) throws JMSException, MessageFactoryException - { - File tempFile = new File(path); - if (tempFile.isDirectory()) - { - //while more files in dir publish them - File[] files = tempFile.listFiles(); - - if (files == null || files.length == 0) - { - _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); - } - else - { - for (File file : files) - { - //Create message factory passing in payload path - FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); - - //Send the message generated from the payload using the _publisher - getPublisher().sendMessage(factory.createEventMessage()); - - } - } - } - else - { - //handle a single file - //Create message factory passing in payload path - FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); - - //Send the message generated from the payload using the _publisher - getPublisher().sendMessage(factory.createEventMessage()); - } - } - - /** - * Cleanup before exit - */ - public static void cleanup() - { - if (getPublisher() != null) - { - getPublisher().cleanup(); - } - } - - /** - * @return A Publisher instance - */ - private static Publisher getPublisher() - { - if (_publisher != null) - { - return _publisher; - } - - //Create a _publisher - _publisher = new Publisher(); - - return _publisher; - } - -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java deleted file mode 100644 index 88bcbbbccb..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.example.publisher; - -import org.apache.qpid.example.shared.FileUtils; -import org.apache.qpid.example.shared.Statics; - -import java.io.*; -import javax.jms.*; - -public class FileMessageFactory -{ - protected final Session _session; - protected final String _payload; - protected final String _filename; - - /** - * Contructs and instance using a filename from which content will be used to create message - * @param session - * @param filename - * @throws MessageFactoryException - */ - public FileMessageFactory(Session session, String filename) throws MessageFactoryException - { - try - { - _filename = filename; - _payload = FileUtils.getFileContent(filename); - _session = session; - } - catch (IOException e) - { - throw new MessageFactoryException(e.toString()); - } - } - - /** - * Creates a text message and sets filename property on it - * The filename property is purely intended to provide visibility - * of file content passing trhough the broker using example classes - * @return Message - a TextMessage with content from file - * @throws JMSException - */ - public Message createEventMessage() throws JMSException - { - TextMessage msg = _session.createTextMessage(); - msg.setText(_payload); - msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName()); - return msg; - } - - /** - * Creates message from a string for use by the monitor - * @param session - * @param textMsg - message content - * @return Message - TextMessage with content from String - * @throws JMSException - */ - public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException - { - TextMessage msg = session.createTextMessage(); - msg.setText(textMsg); - return msg; - } - - public Message createShutdownMessage() throws JMSException - { - return _session.createTextMessage("SHUTDOWN"); - } - - public Message createReportRequestMessage() throws JMSException - { - return _session.createTextMessage("REPORT"); - } - - public Message createReportResponseMessage(String msg) throws JMSException - { - return _session.createTextMessage(msg); - } - - public boolean isShutdown(Message m) - { - return checkText(m, "SHUTDOWN"); - } - - public boolean isReport(Message m) - { - return checkText(m, "REPORT"); - } - - public Object getReport(Message m) - { - try - { - return ((TextMessage) m).getText(); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - return e.toString(); - } - } - - private static boolean checkText(Message m, String s) - { - try - { - return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - return false; - } - } -} - diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java deleted file mode 100644 index 34360d6708..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.log4j.Logger; - -public class MessageFactoryException extends Exception { - - private int _errorCode; - - public MessageFactoryException(String message) - { - super(message); - } - - public MessageFactoryException(String msg, Throwable t) - { - super(msg, t); - } - - public MessageFactoryException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public MessageFactoryException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public MessageFactoryException(Logger logger, String msg, Throwable t) - { - this(msg, t); - logger.error(getMessage(), this); - } - - public MessageFactoryException(Logger logger, String msg) - { - this(msg); - logger.error(getMessage(), this); - } - - public MessageFactoryException(Logger logger, int errorCode, String msg) - { - this(errorCode, msg); - logger.error(getMessage(), this); - } - - public int getErrorCode() - { - return _errorCode; - } -} - diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java deleted file mode 100644 index 8784d340da..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.log4j.Logger; -import org.apache.log4j.BasicConfigurator; - -import javax.jms.*; - -import java.util.Properties; - -/** - * Class that sends heartbeat messages to allow monitoring of message consumption - * Sends regular (currently 20 seconds apart) heartbeat message - */ -public class MonitorMessageDispatcher { - - private static final Logger _logger = Logger.getLogger(MonitorMessageDispatcher.class); - - protected static MonitorPublisher _monitorPublisher = null; - - protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; - - /** - * Easy entry point for running a message dispatcher for monitoring consumption - * @param args - */ - public static void main(String[] args) - { - - //Switch on logging appropriately for your app - BasicConfigurator.configure(); - - try - { - while(true) - { - try - { - //endlessly publish messages to monitor queue - publish(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Dispatched monitor message"); - } - - //sleep for twenty seconds and then publish again - change if appropriate - Thread.sleep(20000); - } - catch(UndeliveredMessageException a) - { - //trigger application specific failure handling here - _logger.error("Problem delivering monitor message"); - break; - } - } - } - catch(Exception e) - { - _logger.error("Error trying to dispatch AMS monitor message: " + e); - System.exit(1); - } - finally - { - if (getMonitorPublisher() != null) - { - getMonitorPublisher().cleanup(); - } - } - - System.exit(1); - } - - /** - * Publish heartbeat message - * @throws JMSException - * @throws UndeliveredMessageException - */ - public static void publish() throws JMSException, UndeliveredMessageException - { - //Send the message generated from the payload using the _publisher - getMonitorPublisher().sendImmediateMessage - (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); - } - - /** - * Cleanup publishers - */ - public static void cleanup() - { - if (getMonitorPublisher() != null) - { - getMonitorPublisher().cleanup(); - } - - if (getMonitorPublisher() != null) - { - getMonitorPublisher().cleanup(); - } - } - - //Returns a _publisher for the monitor queue - private static MonitorPublisher getMonitorPublisher() - { - if (_monitorPublisher != null) - { - return _monitorPublisher; - } - - //Create a _publisher using failover details and constant for monitor queue - _monitorPublisher = new MonitorPublisher(); - - _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); - return _monitorPublisher; - } - -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java deleted file mode 100644 index 233c3fea0a..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import javax.jms.Message; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.log4j.Logger; - -/** - * Subclass of Publisher which uses QPID functionality to send a heartbeat message - * Note immediate flag not available via JMS MessageProducer - */ -public class MonitorPublisher extends Publisher -{ - - private static final Logger _log = Logger.getLogger(Publisher.class); - - BasicMessageProducer _producer; - - public MonitorPublisher() - { - super(); - } - - /* - * Publishes a non-persistent message using transacted session - */ - public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException - { - try - { - _producer = (BasicMessageProducer)_session.createProducer(_destination); - - //Send message via our producer which is not persistent and is immediate - //NB: not available via jms interface MessageProducer - _producer.send(message, DeliveryMode.NON_PERSISTENT, true); - - //commit the message send and close the transaction - _session.commit(); - - } - catch (JMSException e) - { - //Have to assume our commit failed but do not rollback here as channel closed - _log.error(e); - e.printStackTrace(); - throw new UndeliveredMessageException("Cannot deliver immediate message",e); - } - - _log.info(_name + " finished sending message: " + message); - return true; - } -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java deleted file mode 100644 index be42e0e413..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnectionFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.DeliveryMode; -import javax.jms.Queue; -import javax.jms.MessageProducer; -import javax.jms.Connection; -import javax.jms.Session; - -import javax.naming.InitialContext; - -import org.apache.qpid.example.shared.InitialContextHelper; - -public class Publisher -{ - private static final Logger _log = Logger.getLogger(Publisher.class); - - protected InitialContextHelper _contextHelper; - - protected Connection _connection; - - protected Session _session; - - protected MessageProducer _producer; - - protected String _destinationDir; - - protected String _name = "Publisher"; - - protected Queue _destination; - - protected static final String _defaultDestinationDir = "/tmp"; - - /** - * Creates a Publisher instance using properties from example.properties - * See InitialContextHelper for details of how context etc created - */ - public Publisher() - { - try - { - //get an initial context from default properties - _contextHelper = new InitialContextHelper(null); - InitialContext ctx = _contextHelper.getInitialContext(); - - //then create a connection using the AMQConnectionFactory - AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); - _connection = cf.createConnection(); - - //create a transactional session - _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - //lookup the example queue and use it - //Queue is non-exclusive and not deleted when last consumer detaches - _destination = _session.createQueue((String)ctx.lookup("MyQueue")); - - //create a message producer - _producer = _session.createProducer(_destination); - - //set destination dir for files that have been processed - _destinationDir = _defaultDestinationDir; - - _connection.start(); - } - catch (Exception e) - { - e.printStackTrace(); - _log.error(e); - } - } - - /** - * Publishes a non-persistent message using transacted session - * Note that persistent is the default mode for send - so need to specify for transient - */ - public boolean sendMessage(Message message) - { - try - { - //Send message via our producer which is not persistent - _producer.send(message, DeliveryMode.NON_PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); - - //commit the message send and close the transaction - _session.commit(); - - } - catch (JMSException e) - { - //Have to assume our commit failed and rollback here - try - { - _session.rollback(); - _log.error(e); - e.printStackTrace(); - return false; - } - catch (JMSException j) - { - _log.error("Unable to rollback publish transaction ",e); - return false; - } - } - - _log.info(_name + " finished sending message: " + message); - return true; - } - - /** - * Cleanup resources before exit - */ - public void cleanup() - { - try - { - if (_connection != null) - { - _connection.stop(); - _connection.close(); - } - _connection = null; - _producer = null; - } - catch(Exception e) - { - _log.error("Error trying to cleanup publisher " + e); - System.exit(1); - } - } - - /** - * Exposes session - * @return Session - */ - public Session getSession() - { - return _session; - } - - public String getDestinationDir() - { - return _destinationDir; - } - - public void setDestinationDir(String destinationDir) - { - _destinationDir = destinationDir; - } - - public String getName() - { - return _name; - } - - public void setName(String _name) { - this._name = _name; - } -} - diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java deleted file mode 100644 index 3335833c2d..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.publisher; - -import org.apache.log4j.Logger; - -/** - * Exception thrown by monitor when cannot send a message marked for immediate delivery - */ -public class UndeliveredMessageException extends Exception { - - private int _errorCode; - - public UndeliveredMessageException(String message) - { - super(message); - } - - public UndeliveredMessageException(String msg, Throwable t) - { - super(msg, t); - } - - public UndeliveredMessageException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public UndeliveredMessageException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public UndeliveredMessageException(Logger logger, String msg, Throwable t) - { - this(msg, t); - logger.error(getMessage(), this); - } - - public UndeliveredMessageException(Logger logger, String msg) - { - this(msg); - logger.error(getMessage(), this); - } - - public UndeliveredMessageException(Logger logger, int errorCode, String msg) - { - this(errorCode, msg); - logger.error(getMessage(), this); - } - - public int getErrorCode() - { - return _errorCode; - } -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java deleted file mode 100644 index 8723983862..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.shared; - -import org.apache.log4j.Logger; - -public class ConnectionException extends Exception { - - private int _errorCode; - - public ConnectionException(String message) - { - super(message); - } - - public ConnectionException(String msg, Throwable t) - { - super(msg, t); - } - - public ConnectionException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public ConnectionException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public ConnectionException(Logger logger, String msg, Throwable t) - { - this(msg, t); - logger.error(getMessage(), this); - } - - public ConnectionException(Logger logger, String msg) - { - this(msg); - logger.error(getMessage(), this); - } - - public ConnectionException(Logger logger, int errorCode, String msg) - { - this(errorCode, msg); - logger.error(getMessage(), this); - } - - public int getErrorCode() - { - return _errorCode; - } -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java b/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java deleted file mode 100644 index 787cecd541..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.shared; - -import org.apache.log4j.Logger; - -public class ContextException extends Exception { - - private int _errorCode; - - public ContextException(String message) - { - super(message); - } - - public ContextException(String msg, Throwable t) - { - super(msg, t); - } - - public ContextException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public ContextException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public ContextException(Logger logger, String msg, Throwable t) - { - this(msg, t); - logger.error(getMessage(), this); - } - - public ContextException(Logger logger, String msg) - { - this(msg); - logger.error(getMessage(), this); - } - - public ContextException(Logger logger, int errorCode, String msg) - { - this(errorCode, msg); - logger.error(getMessage(), this); - } - - public int getErrorCode() - { - return _errorCode; - } -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java deleted file mode 100644 index 54446cb6a7..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.shared; - -import java.io.*; - -/** - * Class that provides file related utility methods for utility use - */ -public class FileUtils { - - - //Reads file content into String - public static String getFileContent(String filePath) throws IOException - { - - BufferedReader reader = null; - String tempData = ""; - String eol = "\n\r"; - - try - { - String line; - reader = new BufferedReader(new FileReader(filePath)); - while ((line = reader.readLine()) != null) - { - if (!tempData.equals("")) - { - tempData = tempData + eol + line; - } - else - { - tempData = line; - } - } - } - finally - { - if (reader != null) - { - reader.close(); - } - } - return tempData; - } - - /* - * Reads xml from a file and returns it as an array of chars - */ - public static char[] getFileAsCharArray(String filePath) throws IOException - { - BufferedReader reader = null; - char[] tempChars = null; - String tempData = ""; - - try - { - String line; - reader = new BufferedReader(new FileReader(filePath)); - while ((line = reader.readLine()) != null) - { - tempData = tempData + line; - } - tempChars = tempData.toCharArray(); - } - finally - { - if (reader != null) - { - reader.close(); - } - } - return tempChars; - } - - /* - * Write String content to filename provided - */ - public static void writeStringToFile(String content, String path) throws IOException - { - - BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path))); - writer.write(content); - writer.flush(); - writer.close(); - } - - /* - * Allows moving of files to a new dir and preserves the last bit of the name only - */ - public static void moveFileToNewDir(String path, String newDir) throws IOException - { - //get file name from current path - //while more files in dir publish them - File pathFile = new File(path); - if (pathFile.isDirectory()) - { - File[] files = pathFile.listFiles(); - for (File file : files) - { - moveFileToNewDir(file,newDir); - } - } - } - - /* - * Allows moving of a file to a new dir and preserves the last bit of the name only - */ - public static void moveFileToNewDir(File fileToMove, String newDir) throws IOException - { - moveFile(fileToMove,getArchiveFileName(fileToMove,newDir)); - } - - /* - * Moves file from a given path to a new path with String params - */ - public static void moveFile(String fromPath, String dest) throws IOException - { - moveFile(new File(fromPath),new File(dest)); - } - - /* - * Moves file from a given path to a new path with mixed params - */ - public static void moveFile(File fileToMove, String dest) throws IOException - { - moveFile(fileToMove,new File(dest)); - } - - /* - * Moves file from a given path to a new path with File params - */ - public static void moveFile(File fileToMove, File dest) throws IOException - { - fileToMove.renameTo(dest); - } - - /* - * Deletes a given file - */ - public static void deleteFile(String filePath) throws IOException - { - new File(filePath).delete(); - } - - private static String getArchiveFileName(File fileToMove, String archiveDir) - { - //get file name from current path - String fileName = fileToMove.getName(); - return archiveDir + File.separator + fileName; - } -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java deleted file mode 100644 index b39892b688..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.example.shared; - -import org.apache.log4j.Logger; - -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Properties; -import java.io.InputStream; -import java.io.IOException; - -/** - * Class that provides helper methods for JNDI - */ -public class InitialContextHelper { - - public static final String _defaultPropertiesName = "example.properties"; - protected static Properties _fileProperties; - protected static InitialContext _initialContext; - protected static final Logger _log = Logger.getLogger(InitialContextHelper.class); - - public InitialContextHelper(String propertiesName) throws ContextException - { - try - { - if (propertiesName == null || propertiesName.length() == 0) - { - propertiesName = _defaultPropertiesName; - } - - _fileProperties = new Properties(); - ClassLoader cl = this.getClass().getClassLoader(); - - //NB: Need to change path to reflect package if moving classes around ! - InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName); - _fileProperties.load(is); - _initialContext = new InitialContext(_fileProperties); - } - catch (IOException e) - { - throw new ContextException(_log, e.toString()); - } - catch (NamingException n) - { - throw new ContextException(_log, n.toString()); - } - } - - public Properties getFileProperties() - { - return _fileProperties; - } - - public InitialContext getInitialContext() - { - return _initialContext; - } - -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java b/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java deleted file mode 100644 index c056f8a7da..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.shared; - -/** - * Constants used by AMS Publisher/Subscriber classes - */ -public class Statics { - - public static final String TOPIC_NAME = "EXAMPLE_TOPIC"; - - public static final String QUEUE_NAME = "EXAMPLE_QUEUE"; - - public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR"; - - public static final String HOST_PROPERTY = "host"; - - public static final String PORT_PROPERTY = "port"; - - public static final String USER_PROPERTY = "user"; - - public static final String PWD_PROPERTY = "pwd"; - - public static final String TOPIC_PROPERTY = "topic"; - - public static final String QUEUE_PROPERTY = "queue"; - - public static final String VIRTUAL_PATH_PROPERTY = "virtualpath"; - - public static final String ARCHIVE_PATH = "archivepath"; - - public static final String CLIENT_PROPERTY = "client"; - - public static final String FILENAME_PROPERTY = "filename"; - - public static final String DEFAULT_USER = "guest"; - - public static final String DEFAULT_PWD = "guest"; - - -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties b/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties deleted file mode 100644 index 82de41908f..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties +++ /dev/null @@ -1,21 +0,0 @@ -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory - -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1' - -# register some queues in JNDI using the form -# queue.[jndiName] = [physicalName] -queue.MyQueue = example.MyQueue - -# register some topics in JNDI using the form -# topic.[jndiName] = [physicalName] -topic.ibmStocks = stocks.nyse.ibm - -# Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers -# destination.[jniName] = [BindingURL] -destination.direct = direct://amq.direct//directQueue diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java deleted file mode 100644 index 9c195aef40..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -import org.apache.log4j.Logger; -import org.apache.qpid.example.shared.Statics; - -import javax.jms.*; - -/** - * Subclass of Subscriber which consumes a heartbeat message - */ - -public class MonitoredSubscriber extends Subscriber -{ - protected String _monitorDestinationName; - - private static final Logger _logger = Logger.getLogger(MonitoredSubscriber.class); - - private static MessageConsumer _monitorConsumer; - - public MonitoredSubscriber() - { - super(); - //lookup queue name and append suffix - _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX; - } - - /** - * MessageListener implementation for this subscriber - */ - public static class MonitorMessageListener implements MessageListener - { - private String _name; - - public MonitorMessageListener(String name) - { - _name = name; - - } - - /** - * Listens for heartbeat messages and acknowledges them - * @param message - */ - public void onMessage(javax.jms.Message message) - { - _logger.info(_name + " monitor got message '" + message + "'"); - - try - { - _logger.debug("Monitor acknowledging recieved message"); - - //Now acknowledge the message to clear it from our queue - message.acknowledge(); - } - catch(JMSException j) - { - _logger.error("Monitor caught JMSException trying to acknowledge message receipt"); - j.printStackTrace(); - } - catch(Exception e) - { - _logger.error("Monitor caught unexpected exception trying to handle message"); - e.printStackTrace(); - } - } - } - - /** - * Subscribes to Queue and attaches additional monitor listener - */ - public void subscribeAndMonitor() - { - try - { - _connection = _connectionFactory.createConnection(); - - //create a transactional session - Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - //Queue is non-exclusive and not deleted when last consumer detaches - Destination destination = session.createQueue(_monitorDestinationName); - - //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _monitorConsumer = session.createConsumer(destination); - - //give the monitor message listener a name of it's own - _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener - ("MonitorListener " + System.currentTimeMillis())); - - MonitoredSubscriber._logger.info("Starting monitored subscription ..."); - - MonitoredSubscriber._connection.start(); - - //and now start ordinary consumption too - subscribe(); - } - catch (Throwable t) - { - _logger.error("Fatal error: " + t); - t.printStackTrace(); - } - } - - /** - * Stop consuming - */ - public void stopMonitor() - { - try - { - _monitorConsumer.close(); - _monitorConsumer = null; - stop(); - } - catch(JMSException j) - { - _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); - } - } - -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java deleted file mode 100644 index d2f27da052..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -import org.apache.log4j.BasicConfigurator; - -/** - * Allows you to simply start a monitored subscriber - */ -public class MonitoredSubscriptionWrapper { - - private static MonitoredSubscriber _subscriber; - - /** - * Create a monitored subscriber and start it - * @param args - no params required - */ - public static void main(String args[]) - { - //switch on logging - BasicConfigurator.configure(); - - _subscriber = new MonitoredSubscriber(); - - _subscriber.subscribe(); - } - - /** - * Stop subscribing now ... - */ - public static void stop() - { - _subscriber.stop(); - } -} diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java deleted file mode 100644 index 34c7d6c7bb..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnectionFactory; - -import javax.jms.*; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.InitialContext; - -import org.apache.qpid.example.shared.InitialContextHelper; - -/** - * Subscriber which consumes messages from a queue - */ - -public class Subscriber -{ - private static final Logger _log = Logger.getLogger(Subscriber.class); - - protected static Connection _connection; - - protected static MessageConsumer _consumer; - - protected static InitialContextHelper _contextHelper; - - protected static AMQConnectionFactory _connectionFactory; - - protected String _destinationName; - - public Subscriber() - { - try - { - //get an initial context from default properties - _contextHelper = new InitialContextHelper(null); - InitialContext ctx = _contextHelper.getInitialContext(); - - //then create a connection using the AMQConnectionFactory - _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); - - //lookup queue name - _destinationName = (String) ctx.lookup("MyQueue"); - - } - catch (Exception e) - { - e.printStackTrace(); - _log.error(e); - } - } - - /** - * Listener class that handles messages - */ - public static class ExampleMessageListener implements MessageListener - { - private String _name; - - public ExampleMessageListener(String name) - { - _name = name; - - } - - /** - * Listens for message callbacks, handles and then acknowledges them - * @param message - the message received - */ - public void onMessage(javax.jms.Message message) - { - _log.info(_name + " got message '" + message + "'"); - - try - { - //NB: Handle your message appropriately for your application here - //do some stuff - - _log.debug("Acknowledging recieved message"); - - //Now acknowledge the message to clear it from our queue - message.acknowledge(); - } - catch(JMSException j) - { - _log.error("JMSException trying to acknowledge message receipt"); - j.printStackTrace(); - } - catch(Exception e) - { - _log.error("Unexpected exception trying to handle message"); - e.printStackTrace(); - } - } - } - - /** - * Subscribes to example Queue and attaches listener - */ - public void subscribe() - { - _log.info("Starting subscription ..."); - - try - { - _connection = _connectionFactory.createConnection(); - - //create a transactional session - Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - //Queue is non-exclusive and not deleted when last consumer detaches - Destination destination = session.createQueue(_destinationName); - - //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _consumer = session.createConsumer(destination); - - //give the message listener a name of it's own - _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); - - _connection.start(); - } - catch (Throwable t) - { - _log.error("Fatal error: " + t); - t.printStackTrace(); - } - - _log.info("Waiting for messages ..."); - - //wait for messages and sleep to survive failover - try - { - while(true) - { - Thread.sleep(Long.MAX_VALUE); - } - } - catch (Exception e) - { - _log.warn("Exception while Subscriber sleeping",e); - } - } - - /** - * Set destination (queue or topic) name - * @param name - */ - public void setDestinationName(String name) - { - _destinationName = name; - } - - /** - * Stop consuming and close connection - */ - public void stop() - { - try - { - _consumer.close(); - _consumer = null; - _connection.stop(); - _connection.close(); - } - catch(JMSException j) - { - _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); - } - } - -} - - - - diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java deleted file mode 100644 index 32a0ef685c..0000000000 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.example.subscriber; - -import org.apache.log4j.BasicConfigurator; - -/** - * Allows you to simply start a subscriber - */ -public class SubscriptionWrapper { - - private static Subscriber _subscriber; - - /** - * Create a subscriber and start it - * @param args - */ - public static void main(String args[]) - { - //switch on logging - BasicConfigurator.configure(); - - _subscriber = new Subscriber(); - - _subscriber.subscribe(); - } - - /** - * Stop subscribing now ... - */ - public static void stop() - { - _subscriber.stop(); - } -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java index ef00f0b9f2..727881de96 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java @@ -168,24 +168,6 @@ public class StreamMessageTest extends TestCase } } - public void testWriteObjectThrowsNPE() throws Exception - { - try - { - JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); - bm.writeObject(null); - fail("expected exception did not occur"); - } - catch (NullPointerException n) - { - // ok - } - catch (Exception e) - { - fail("expected NullPointerException, got " + e); - } - } - public void testReadBoolean() throws Exception { JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); @@ -221,9 +203,34 @@ public class StreamMessageTest extends TestCase len = bm.readBytes(bytes); assertEquals(-1, len); len = bm.readBytes(bytes); + assertEquals(-1, len); + len = bm.readBytes(bytes); assertEquals(0, len); } + public void testReadBytesFollowedByPrimitive() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7, 8}); + bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7}); + bm.writeString("Foo"); + bm.reset(); + int len; + do + { + len = bm.readBytes(new byte[2]); + } + while (len == 2); + + do + { + len = bm.readBytes(new byte[2]); + } + while (len == 2); + + assertEquals("Foo", bm.readString()); + } + public void testReadMultipleByteArrays() throws Exception { JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); @@ -577,11 +584,11 @@ public class StreamMessageTest extends TestCase bm = TestMessageHelper.newJMSStreamMessage(); bm.writeString("2"); bm.reset(); - assertEquals((byte)2, bm.readByte()); + assertEquals((byte)2, bm.readByte()); bm.reset(); assertEquals((short)2, bm.readShort()); bm.reset(); - assertEquals((int)2, bm.readInt()); + assertEquals(2, bm.readInt()); bm.reset(); assertEquals((long)2, bm.readLong()); bm = TestMessageHelper.newJMSStreamMessage(); @@ -592,6 +599,16 @@ public class StreamMessageTest extends TestCase assertEquals(5.7d, bm.readDouble()); } + public void testNulls() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString(null); + bm.writeObject(null); + bm.reset(); + assertNull(bm.readObject()); + assertNull(bm.readObject()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(StreamMessageTest.class); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index b9aee2087e..6213a9d318 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -94,25 +94,45 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void routeAndTest(Message m, TestQueue... expected) throws AMQException { - routeAndTest(m, Arrays.asList(expected)); + routeAndTest(m, false, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException + { + routeAndTest(m, expectReturn, Arrays.asList(expected)); } protected void routeAndTest(Message m, List expected) throws AMQException { - route(m); - for (TestQueue q : queues) + routeAndTest(m, false, expected); + } + + protected void routeAndTest(Message m, boolean expectReturn, List expected) throws AMQException + { + try { - if (expected.contains(q)) + route(m); + assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + for (TestQueue q : queues) { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } } } + + catch (NoRouteException ex) + { + assertTrue("Expected "+m+" not to be returned",expectReturn); + } + } static FieldTable getHeaders(String... entries) diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 91520df3bf..c220442a78 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.framing.BasicPublishBody; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { @@ -52,6 +53,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q5, q6, q7, q8); routeAndTest(new Message("Message6", "F0002")); + + Message m7 = new Message("Message7", "XXXXX"); + + BasicPublishBody pb7 = m7.getPublishBody(); + pb7.mandatory = true; + routeAndTest(m7,true); + + Message m8 = new Message("Message8", "F0000"); + BasicPublishBody pb8 = m8.getPublishBody(); + pb8.mandatory = true; + routeAndTest(m8,false,q1); + + } public void testAny() throws AMQException @@ -71,6 +85,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message6", "F0002")); } + public void testMandatory() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + Message m1 = new Message("Message1", "XXXXX"); + Message m2 = new Message("Message2", "F0000"); + BasicPublishBody pb1 = m1.getPublishBody(); + pb1.mandatory = true; + BasicPublishBody pb2 = m1.getPublishBody(); + pb2.mandatory = true; + routeAndTest(m1,true); + + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(HeadersExchangeTest.class); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java new file mode 100644 index 0000000000..4dffe3e75f --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.test.VMBrokerSetup; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.client.*; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.PropertyFieldTable; + +import javax.jms.*; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; + +public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener +{ + private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class); + + private final List _bouncedMessageList = Collections.synchronizedList(new ArrayList()); + + static + { + String workdir = System.getProperty("QPID_WORK"); + if (workdir == null || workdir.equals("")) + { + String tempdir = System.getProperty("java.io.tmpdir"); + System.out.println("QPID_WORK not set using tmp directory: " + tempdir); + System.setProperty("QPID_WORK", tempdir); + } +// DOMConfigurator.configure("../broker/etc/log4j.xml"); + } + + protected void setUp() throws Exception + { + super.setUp(); + ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + /** + * Tests that mandatory message which are not routable are returned to the producer + * + * @throws Exception + */ + public void testReturnUnroutableMandatoryMessage() throws Exception + { + _bouncedMessageList.clear(); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'")); + FieldTable ft = new PropertyFieldTable(); + ft.setString("F1000","1"); + MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft); + + + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + + con2.setExceptionListener(this); + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Need to start the "producer" connection in order to receive bounced messages + _logger.info("Starting producer connection"); + con2.start(); + + + MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false); + MessageProducer mandatoryProducer = producerSession.createProducer(queue); + + + // First test - should neither be bounced nor routed + _logger.info("Sending non-routable non-mandatory message"); + TextMessage msg1 = producerSession.createTextMessage("msg1"); + nonMandatoryProducer.send(msg1); + + // Second test - should be bounced + _logger.info("Sending non-routable mandatory message"); + TextMessage msg2 = producerSession.createTextMessage("msg2"); + mandatoryProducer.send(msg2); + + // Third test - should be routed + _logger.info("Sending routable message"); + TextMessage msg3 = producerSession.createTextMessage("msg3"); + msg3.setStringProperty("F1000","1"); + mandatoryProducer.send(msg3); + + + + _logger.info("Starting consumer connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(1000L); + + assertTrue("No message routed to receiver",tm != null); + assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText())); + + try + { + Thread.sleep(1000L); + } + catch(InterruptedException e) + { + ; + } + + assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1); + Message m = _bouncedMessageList.get(0); + assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2")); + + + + + con.close(); + con2.close(); + + + } + + public static junit.framework.Test suite() + { + return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class)); + } + + public void onException(JMSException jmsException) + { + _logger.warn("Caught exception on producer: ",jmsException); + Exception linkedException = jmsException.getLinkedException(); + if(linkedException instanceof AMQNoRouteException) + { + AMQNoRouteException noRoute = (AMQNoRouteException) linkedException; + Message bounced = (Message) noRoute.getUndeliveredMessage(); + _bouncedMessageList.add(bounced); + } + } +} -- cgit v1.2.1