diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-21 14:24:38 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-21 14:24:38 +0000 |
commit | 6d5154b0d5ad1fb32aadda06801c5dc8fdc958eb (patch) | |
tree | adfaef57db9a1612b598e96f014109032b43d7ee | |
parent | 30740c7824aed3b1f3984ee9992fde365c72cd16 (diff) | |
download | qpid-python-6d5154b0d5ad1fb32aadda06801c5dc8fdc958eb.tar.gz |
Merge from trunk up to rev 486165
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@489368 13f79535-47bb-0310-9956-ffa450edef68
30 files changed, 773 insertions, 340 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 818e5b04b2..dbf4ef91db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -206,7 +206,18 @@ public class HeadersExchange extends AbstractExchange } if (!routed) { - _logger.warn("Exchange " + getName() + ": message not routable."); + + String msg = "Exchange " + getName() + ": message not routable."; + + if (payload.getPublishBody().mandatory) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } } diff --git a/java/client/example/pom.xml b/java/client/example/pom.xml new file mode 100644 index 0000000000..ac0081c00b --- /dev/null +++ b/java/client/example/pom.xml @@ -0,0 +1,94 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-example</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid Example</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <amqj.logging.level>warn</amqj.logging.level> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </dependency> + + <dependency> + <groupId>jmscts</groupId> + <artifactId>jmscts</artifactId> + <version>0.5-b2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>amqj.noAutoCreateVMBroker</name> + <value>true</value> + </property> + <property> + <name>amqj.logging.level</name> + <value>${amqj.logging.level}</value> + </property> + <property> + <name>log4j.configuration</name> + <value>file:///${basedir}/src/main/java/log4j.properties</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml index de64423a51..de64423a51 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml +++ b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index b199d41432..b199d41432 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index 88bcbbbccb..88bcbbbccb 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java index 34360d6708..34360d6708 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 8784d340da..8784d340da 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 233c3fea0a..233c3fea0a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java index be42e0e413..2bde4ec35c 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -75,7 +75,7 @@ public class Publisher //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches - _destination = _session.createQueue((String)ctx.lookup("MyQueue")); + _destination = (Queue) ctx.lookup("MyQueue"); //create a message producer _producer = _session.createProducer(_destination); diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java index 3335833c2d..3335833c2d 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java index 8723983862..8723983862 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java index 787cecd541..787cecd541 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java index 54446cb6a7..54446cb6a7 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java index b39892b688..b39892b688 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java index c056f8a7da..c056f8a7da 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties index 82de41908f..7f513341a2 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties @@ -5,7 +5,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF # register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1' +connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java index 9c195aef40..1d2e5e0e66 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java @@ -39,7 +39,7 @@ public class MonitoredSubscriber extends Subscriber { super(); //lookup queue name and append suffix - _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX; + _monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX; } /** diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java index d2f27da052..d2f27da052 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java index 34c7d6c7bb..4e92a6c678 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -45,7 +45,7 @@ public class Subscriber protected static AMQConnectionFactory _connectionFactory; - protected String _destinationName; + protected Destination _destination; public Subscriber() { @@ -58,8 +58,8 @@ public class Subscriber //then create a connection using the AMQConnectionFactory _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); - //lookup queue name - _destinationName = (String) ctx.lookup("MyQueue"); + //lookup queue from context + _destination = (Destination) ctx.lookup("MyQueue"); } catch (Exception e) @@ -79,7 +79,6 @@ public class Subscriber public ExampleMessageListener(String name) { _name = name; - } /** @@ -127,11 +126,8 @@ public class Subscriber //create a transactional session Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - //Queue is non-exclusive and not deleted when last consumer detaches - Destination destination = session.createQueue(_destinationName); - //Create a consumer with a destination of our queue which will use defaults for prefetch etc - _consumer = session.createConsumer(destination); + _consumer = session.createConsumer(_destination); //give the message listener a name of it's own _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); @@ -161,15 +157,6 @@ public class Subscriber } /** - * Set destination (queue or topic) name - * @param name - */ - public void setDestinationName(String name) - { - _destinationName = name; - } - - /** * Stop consuming and close connection */ public void stop() diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java index 32a0ef685c..32a0ef685c 100644 --- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 77b3bd7566..bec0686ce4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -21,23 +21,20 @@ package org.apache.qpid.client.message; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; import javax.jms.MessageEOFException; -import javax.jms.MessageNotWriteableException; import java.io.IOException; import java.nio.charset.Charset; -import java.nio.charset.CharacterCodingException; /** * @author Apache Software Foundation */ public abstract class AbstractBytesMessage extends AbstractJMSMessage -{ +{ /** * The default initial size of the buffer. The buffer expands automatically. @@ -79,7 +76,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { _data.clear(); } - + public String toBodyString() throws JMSException { checkReadable(); @@ -124,7 +121,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage return data; } } - + /** * Check that there is at least a certain number of bytes available to read * @@ -138,10 +135,4 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage throw new MessageEOFException("Unable to read " + len + " bytes"); } } - - public void reset() throws JMSException - { - super.reset(); - _data.flip(); - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 514287aea7..6bd4fd0297 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -384,15 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } public void acknowledge() throws JMSException - { + { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_session != null) { if (_session.getAMQConnection().isClosed()){ throw new javax.jms.IllegalStateException("Connection is already closed"); - } - + } + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session _session.acknowledgeMessage(_deliveryTag, true); @@ -546,7 +546,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms public void reset() throws JMSException { - _readableMessage = true; + if (_readableMessage) + { + _data.rewind(); + } + else + { + _data.flip(); + _readableMessage = true; + } } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index ccb3c0bf57..04f3c5ee17 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -33,7 +33,7 @@ import java.nio.charset.Charset; */ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage { - public static final String MIME_TYPE="jms/stream-message"; + public static final String MIME_TYPE="jms/stream-message"; private static final byte BOOLEAN_TYPE = (byte) 1; @@ -55,6 +55,8 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess private static final byte STRING_TYPE = (byte) 10; + private static final byte NULL_STRING_TYPE = (byte) 11; + /** * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read * a byte array in multiple chunks, hence this is used to track how much is left to be read @@ -89,7 +91,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess return MIME_TYPE; } - private byte readAndCheckType() throws MessageFormatException, MessageEOFException, + private byte readWireType() throws MessageFormatException, MessageEOFException, MessageNotReadableException { checkReadable(); @@ -105,22 +107,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public boolean readBoolean() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); boolean result; - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private boolean readBooleanImpl() @@ -130,20 +142,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public byte readByte() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); byte result; - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } return result; } @@ -155,24 +177,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public short readShort() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); short result; - switch (wireType) + try { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } return result; } @@ -190,15 +222,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ public char readChar() throws JMSException { - byte wireType = readAndCheckType(); - if (wireType != CHAR_TYPE) + int position = _data.position(); + byte wireType = readWireType(); + try { - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } } - else + catch (RuntimeException e) { - checkAvailable(2); - return readCharImpl(); + _data.position(position); + throw e; } } @@ -209,30 +251,40 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public int readInt() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); int result; - switch (wireType) + try { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private int readIntImpl() @@ -242,34 +294,44 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public long readLong() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); long result; - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private long readLongImpl() @@ -279,22 +341,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public float readFloat() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); float result; - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private float readFloatImpl() @@ -304,26 +376,36 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public double readDouble() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); double result; - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private double readDoubleImpl() @@ -333,50 +415,63 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public String readString() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); String result; - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private String readStringImpl() throws JMSException @@ -406,7 +501,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess // type discriminator checked separately so you get a MessageFormatException rather than // an EOF even in the case where both would be applicable checkAvailable(1); - byte wireType = readAndCheckType(); + byte wireType = readWireType(); if (wireType != BYTEARRAY_TYPE) { throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); @@ -431,18 +526,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess } } } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } - return readBytesImpl(bytes); + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; } private int readBytesImpl(byte[] bytes) { int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); _byteArrayRemaining -= count; - if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - } + if (count == 0) { return 0; @@ -456,62 +558,74 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public Object readObject() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); Object result = null; - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + result = new byte[size]; + readBytesImpl(new byte[size]); + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: result = null; - } - else - { - _byteArrayRemaining = size; - result = new byte[size]; - readBytesImpl(new byte[size]); - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } public void writeBoolean(boolean b) throws JMSException @@ -564,18 +678,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeString(String string) throws JMSException { - writeTypeDiscriminator(STRING_TYPE); - try + if (string == null) { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte)0); + writeTypeDiscriminator(NULL_STRING_TYPE); } - catch (CharacterCodingException e) + else { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; + writeTypeDiscriminator(STRING_TYPE); + try + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } } } @@ -601,11 +722,17 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeObject(Object object) throws JMSException { checkWritable(); + Class clazz = null; if (object == null) { - throw new NullPointerException("Argument must not be null"); + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); } - Class clazz = object.getClass(); + if (clazz == Byte.class) { writeByte((Byte) object); diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java index 842b2d7696..2c08f1e34a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java +++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java @@ -36,7 +36,7 @@ public class JNDIBindConnectionFactory { public static final String CONNECTION_FACTORY_BINDING = "amq.ConnectionFactory"; - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI"; + public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI"; public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory"; public static final String DEFAULT_CONNECTION_URL = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; @@ -154,7 +154,7 @@ public class JNDIBindConnectionFactory } catch (NamingException e) { - + System.out.println("Operation failed: " + e); } // Perform the bind @@ -169,11 +169,11 @@ public class JNDIBindConnectionFactory } catch (NamingException amqe) { - + System.out.println("Operation failed: " + amqe); } catch (URLSyntaxException e) { - + System.out.println("Operation failed: " + e); } } diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java index 5f328a4107..07dc8c85b3 100644 --- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java +++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java @@ -20,24 +20,25 @@ */ package org.apache.qpid.IBMPerfTest; -import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import org.apache.qpid.client.AMQSession; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; -import javax.jms.*; -import java.util.Hashtable; import java.io.File; -import java.net.MalformedURLException; +import java.util.Hashtable; public class JNDIBindQueue -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI"; +{ + public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI"; public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory"; @@ -98,7 +99,7 @@ public class JNDIBindQueue } catch (JMSException closeE) { - + System.out.println("Connection closing failed: " + closeE); } } diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java index c31dce22cf..16bf0dcb7a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java +++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java @@ -20,24 +20,25 @@ */ package org.apache.qpid.IBMPerfTest; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; -import java.util.Hashtable; import java.io.File; -import java.net.MalformedURLException; +import java.util.Hashtable; public class JNDIBindTopic -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI"; +{ + public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI"; public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory"; @@ -99,11 +100,9 @@ public class JNDIBindTopic } catch (JMSException closeE) { - + System.out.println("Operation failed: " + closeE); } } - - } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java index ef00f0b9f2..727881de96 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java @@ -168,24 +168,6 @@ public class StreamMessageTest extends TestCase } } - public void testWriteObjectThrowsNPE() throws Exception - { - try - { - JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); - bm.writeObject(null); - fail("expected exception did not occur"); - } - catch (NullPointerException n) - { - // ok - } - catch (Exception e) - { - fail("expected NullPointerException, got " + e); - } - } - public void testReadBoolean() throws Exception { JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); @@ -221,9 +203,34 @@ public class StreamMessageTest extends TestCase len = bm.readBytes(bytes); assertEquals(-1, len); len = bm.readBytes(bytes); + assertEquals(-1, len); + len = bm.readBytes(bytes); assertEquals(0, len); } + public void testReadBytesFollowedByPrimitive() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7, 8}); + bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7}); + bm.writeString("Foo"); + bm.reset(); + int len; + do + { + len = bm.readBytes(new byte[2]); + } + while (len == 2); + + do + { + len = bm.readBytes(new byte[2]); + } + while (len == 2); + + assertEquals("Foo", bm.readString()); + } + public void testReadMultipleByteArrays() throws Exception { JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); @@ -577,11 +584,11 @@ public class StreamMessageTest extends TestCase bm = TestMessageHelper.newJMSStreamMessage(); bm.writeString("2"); bm.reset(); - assertEquals((byte)2, bm.readByte()); + assertEquals((byte)2, bm.readByte()); bm.reset(); assertEquals((short)2, bm.readShort()); bm.reset(); - assertEquals((int)2, bm.readInt()); + assertEquals(2, bm.readInt()); bm.reset(); assertEquals((long)2, bm.readLong()); bm = TestMessageHelper.newJMSStreamMessage(); @@ -592,6 +599,16 @@ public class StreamMessageTest extends TestCase assertEquals(5.7d, bm.readDouble()); } + public void testNulls() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString(null); + bm.writeObject(null); + bm.reset(); + assertNull(bm.readObject()); + assertNull(bm.readObject()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(StreamMessageTest.class); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index b9aee2087e..6213a9d318 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -94,25 +94,45 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void routeAndTest(Message m, TestQueue... expected) throws AMQException { - routeAndTest(m, Arrays.asList(expected)); + routeAndTest(m, false, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException + { + routeAndTest(m, expectReturn, Arrays.asList(expected)); } protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException { - route(m); - for (TestQueue q : queues) + routeAndTest(m, false, expected); + } + + protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException + { + try { - if (expected.contains(q)) + route(m); + assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + for (TestQueue q : queues) { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } } } + + catch (NoRouteException ex) + { + assertTrue("Expected "+m+" not to be returned",expectReturn); + } + } static FieldTable getHeaders(String... entries) diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 91520df3bf..c220442a78 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.framing.BasicPublishBody; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { @@ -52,6 +53,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q5, q6, q7, q8); routeAndTest(new Message("Message6", "F0002")); + + Message m7 = new Message("Message7", "XXXXX"); + + BasicPublishBody pb7 = m7.getPublishBody(); + pb7.mandatory = true; + routeAndTest(m7,true); + + Message m8 = new Message("Message8", "F0000"); + BasicPublishBody pb8 = m8.getPublishBody(); + pb8.mandatory = true; + routeAndTest(m8,false,q1); + + } public void testAny() throws AMQException @@ -71,6 +85,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase routeAndTest(new Message("Message6", "F0002")); } + public void testMandatory() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + Message m1 = new Message("Message1", "XXXXX"); + Message m2 = new Message("Message2", "F0000"); + BasicPublishBody pb1 = m1.getPublishBody(); + pb1.mandatory = true; + BasicPublishBody pb2 = m1.getPublishBody(); + pb2.mandatory = true; + routeAndTest(m1,true); + + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(HeadersExchangeTest.class); diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java new file mode 100644 index 0000000000..4dffe3e75f --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.client.*;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+
+public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+ private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+// DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * Tests that mandatory message which are not routable are returned to the producer
+ *
+ * @throws Exception
+ */
+ public void testReturnUnroutableMandatoryMessage() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ FieldTable ft = new PropertyFieldTable();
+ ft.setString("F1000","1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
+
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+
+ // First test - should neither be bounced nor routed
+ _logger.info("Sending non-routable non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ // Third test - should be routed
+ _logger.info("Sending routable message");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000","1");
+ mandatoryProducer.send(msg3);
+
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver",tm != null);
+ assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch(InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
+
+
+
+
+ con.close();
+ con2.close();
+
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ _logger.warn("Caught exception on producer: ",jmsException);
+ Exception linkedException = jmsException.getLinkedException();
+ if(linkedException instanceof AMQNoRouteException)
+ {
+ AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
+ Message bounced = (Message) noRoute.getUndeliveredMessage();
+ _bouncedMessageList.add(bounced);
+ }
+ }
+}
|