summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-21 14:24:38 +0000
committerRobert Greig <rgreig@apache.org>2006-12-21 14:24:38 +0000
commit6d5154b0d5ad1fb32aadda06801c5dc8fdc958eb (patch)
treeadfaef57db9a1612b598e96f014109032b43d7ee
parent30740c7824aed3b1f3984ee9992fde365c72cd16 (diff)
downloadqpid-python-6d5154b0d5ad1fb32aadda06801c5dc8fdc958eb.tar.gz
Merge from trunk up to rev 486165
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@489368 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java13
-rw-r--r--java/client/example/pom.xml94
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/log4j.xml (renamed from java/client/src/old_test/java/org/apache/qpid/example/log4j.xml)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java)2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties (renamed from java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties)2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java)2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java)0
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java)21
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java (renamed from java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java)0
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java611
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java8
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java21
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java21
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java57
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java42
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java28
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java151
30 files changed, 773 insertions, 340 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 818e5b04b2..dbf4ef91db 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -206,7 +206,18 @@ public class HeadersExchange extends AbstractExchange
}
if (!routed)
{
- _logger.warn("Exchange " + getName() + ": message not routable.");
+
+ String msg = "Exchange " + getName() + ": message not routable.";
+
+ if (payload.getPublishBody().mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+
}
}
diff --git a/java/client/example/pom.xml b/java/client/example/pom.xml
new file mode 100644
index 0000000000..ac0081c00b
--- /dev/null
+++ b/java/client/example/pom.xml
@@ -0,0 +1,94 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-example</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid Example</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ <amqj.logging.level>warn</amqj.logging.level>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>jmscts</groupId>
+ <artifactId>jmscts</artifactId>
+ <version>0.5-b2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>amqj.noAutoCreateVMBroker</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>amqj.logging.level</name>
+ <value>${amqj.logging.level}</value>
+ </property>
+ <property>
+ <name>log4j.configuration</name>
+ <value>file:///${basedir}/src/main/java/log4j.properties</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml
index de64423a51..de64423a51 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/log4j.xml
+++ b/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
index b199d41432..b199d41432 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
index 88bcbbbccb..88bcbbbccb 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/FileMessageFactory.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
index 34360d6708..34360d6708 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MessageFactoryException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
index 8784d340da..8784d340da 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
index 233c3fea0a..233c3fea0a 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/MonitorPublisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
index be42e0e413..2bde4ec35c 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/Publisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
@@ -75,7 +75,7 @@ public class Publisher
//lookup the example queue and use it
//Queue is non-exclusive and not deleted when last consumer detaches
- _destination = _session.createQueue((String)ctx.lookup("MyQueue"));
+ _destination = (Queue) ctx.lookup("MyQueue");
//create a message producer
_producer = _session.createProducer(_destination);
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
index 3335833c2d..3335833c2d 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
index 8723983862..8723983862 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/shared/ConnectionException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
index 787cecd541..787cecd541 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/shared/ContextException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
index 54446cb6a7..54446cb6a7 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/shared/FileUtils.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
index b39892b688..b39892b688 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/shared/InitialContextHelper.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
index c056f8a7da..c056f8a7da 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/shared/Statics.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
index 82de41908f..7f513341a2 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/shared/example.properties
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
@@ -5,7 +5,7 @@ java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextF
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'
+connectionfactory.local = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
index 9c195aef40..1d2e5e0e66 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
@@ -39,7 +39,7 @@ public class MonitoredSubscriber extends Subscriber
{
super();
//lookup queue name and append suffix
- _monitorDestinationName = _destinationName + Statics.MONITOR_QUEUE_SUFFIX;
+ _monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX;
}
/**
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
index d2f27da052..d2f27da052 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
index 34c7d6c7bb..4e92a6c678 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
@@ -45,7 +45,7 @@ public class Subscriber
protected static AMQConnectionFactory _connectionFactory;
- protected String _destinationName;
+ protected Destination _destination;
public Subscriber()
{
@@ -58,8 +58,8 @@ public class Subscriber
//then create a connection using the AMQConnectionFactory
_connectionFactory = (AMQConnectionFactory) ctx.lookup("local");
- //lookup queue name
- _destinationName = (String) ctx.lookup("MyQueue");
+ //lookup queue from context
+ _destination = (Destination) ctx.lookup("MyQueue");
}
catch (Exception e)
@@ -79,7 +79,6 @@ public class Subscriber
public ExampleMessageListener(String name)
{
_name = name;
-
}
/**
@@ -127,11 +126,8 @@ public class Subscriber
//create a transactional session
Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- //Queue is non-exclusive and not deleted when last consumer detaches
- Destination destination = session.createQueue(_destinationName);
-
//Create a consumer with a destination of our queue which will use defaults for prefetch etc
- _consumer = session.createConsumer(destination);
+ _consumer = session.createConsumer(_destination);
//give the message listener a name of it's own
_consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis()));
@@ -161,15 +157,6 @@ public class Subscriber
}
/**
- * Set destination (queue or topic) name
- * @param name
- */
- public void setDestinationName(String name)
- {
- _destinationName = name;
- }
-
- /**
* Stop consuming and close connection
*/
public void stop()
diff --git a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
index 32a0ef685c..32a0ef685c 100644
--- a/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 77b3bd7566..bec0686ce4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -21,23 +21,20 @@
package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
import javax.jms.MessageEOFException;
-import javax.jms.MessageNotWriteableException;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.nio.charset.CharacterCodingException;
/**
* @author Apache Software Foundation
*/
public abstract class AbstractBytesMessage extends AbstractJMSMessage
-{
+{
/**
* The default initial size of the buffer. The buffer expands automatically.
@@ -79,7 +76,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
_data.clear();
}
-
+
public String toBodyString() throws JMSException
{
checkReadable();
@@ -124,7 +121,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
return data;
}
}
-
+
/**
* Check that there is at least a certain number of bytes available to read
*
@@ -138,10 +135,4 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
throw new MessageEOFException("Unable to read " + len + " bytes");
}
}
-
- public void reset() throws JMSException
- {
- super.reset();
- _data.flip();
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 514287aea7..6bd4fd0297 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -384,15 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
public void acknowledge() throws JMSException
- {
+ {
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_session != null)
{
if (_session.getAMQConnection().isClosed()){
throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
+ }
+
// we set multiple to true here since acknowledgement implies acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);
@@ -546,7 +546,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public void reset() throws JMSException
{
- _readableMessage = true;
+ if (_readableMessage)
+ {
+ _data.rewind();
+ }
+ else
+ {
+ _data.flip();
+ _readableMessage = true;
+ }
}
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index ccb3c0bf57..04f3c5ee17 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -33,7 +33,7 @@ import java.nio.charset.Charset;
*/
public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
{
- public static final String MIME_TYPE="jms/stream-message";
+ public static final String MIME_TYPE="jms/stream-message";
private static final byte BOOLEAN_TYPE = (byte) 1;
@@ -55,6 +55,8 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
private static final byte STRING_TYPE = (byte) 10;
+ private static final byte NULL_STRING_TYPE = (byte) 11;
+
/**
* This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
* a byte array in multiple chunks, hence this is used to track how much is left to be read
@@ -89,7 +91,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
return MIME_TYPE;
}
- private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+ private byte readWireType() throws MessageFormatException, MessageEOFException,
MessageNotReadableException
{
checkReadable();
@@ -105,22 +107,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public boolean readBoolean() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
boolean result;
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private boolean readBooleanImpl()
@@ -130,20 +142,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public byte readByte() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
byte result;
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -155,24 +177,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public short readShort() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
short result;
- switch (wireType)
+ try
{
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -190,15 +222,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
*/
public char readChar() throws JMSException
{
- byte wireType = readAndCheckType();
- if (wireType != CHAR_TYPE)
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
{
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
}
- else
+ catch (RuntimeException e)
{
- checkAvailable(2);
- return readCharImpl();
+ _data.position(position);
+ throw e;
}
}
@@ -209,30 +251,40 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public int readInt() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
int result;
- switch (wireType)
+ try
{
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private int readIntImpl()
@@ -242,34 +294,44 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public long readLong() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
long result;
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private long readLongImpl()
@@ -279,22 +341,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public float readFloat() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
float result;
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private float readFloatImpl()
@@ -304,26 +376,36 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public double readDouble() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
double result;
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private double readDoubleImpl()
@@ -333,50 +415,63 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public String readString() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
String result;
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private String readStringImpl() throws JMSException
@@ -406,7 +501,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
// type discriminator checked separately so you get a MessageFormatException rather than
// an EOF even in the case where both would be applicable
checkAvailable(1);
- byte wireType = readAndCheckType();
+ byte wireType = readWireType();
if (wireType != BYTEARRAY_TYPE)
{
throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
@@ -431,18 +526,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
}
}
}
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
- return readBytesImpl(bytes);
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
}
private int readBytesImpl(byte[] bytes)
{
int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
_byteArrayRemaining -= count;
- if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- }
+
if (count == 0)
{
return 0;
@@ -456,62 +558,74 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public Object readObject() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
Object result = null;
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ result = new byte[size];
+ readBytesImpl(new byte[size]);
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- result = new byte[size];
- readBytesImpl(new byte[size]);
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
public void writeBoolean(boolean b) throws JMSException
@@ -564,18 +678,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeString(String string) throws JMSException
{
- writeTypeDiscriminator(STRING_TYPE);
- try
+ if (string == null)
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte)0);
+ writeTypeDiscriminator(NULL_STRING_TYPE);
}
- catch (CharacterCodingException e)
+ else
{
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte)0);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
}
@@ -601,11 +722,17 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeObject(Object object) throws JMSException
{
checkWritable();
+ Class clazz = null;
if (object == null)
{
- throw new NullPointerException("Argument must not be null");
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
}
- Class clazz = object.getClass();
+
if (clazz == Byte.class)
{
writeByte((Byte) object);
diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
index 842b2d7696..2c08f1e34a 100644
--- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
+++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
@@ -36,7 +36,7 @@ public class JNDIBindConnectionFactory
{
public static final String CONNECTION_FACTORY_BINDING = "amq.ConnectionFactory";
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
public static final String DEFAULT_CONNECTION_URL = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
@@ -154,7 +154,7 @@ public class JNDIBindConnectionFactory
}
catch (NamingException e)
{
-
+ System.out.println("Operation failed: " + e);
}
// Perform the bind
@@ -169,11 +169,11 @@ public class JNDIBindConnectionFactory
}
catch (NamingException amqe)
{
-
+ System.out.println("Operation failed: " + amqe);
}
catch (URLSyntaxException e)
{
-
+ System.out.println("Operation failed: " + e);
}
}
diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
index 5f328a4107..07dc8c85b3 100644
--- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
+++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
@@ -20,24 +20,25 @@
*/
package org.apache.qpid.IBMPerfTest;
-import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
+import org.apache.qpid.client.AMQSession;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import javax.jms.*;
-import java.util.Hashtable;
import java.io.File;
-import java.net.MalformedURLException;
+import java.util.Hashtable;
public class JNDIBindQueue
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+{
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
@@ -98,7 +99,7 @@ public class JNDIBindQueue
}
catch (JMSException closeE)
{
-
+ System.out.println("Connection closing failed: " + closeE);
}
}
diff --git a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
index c31dce22cf..16bf0dcb7a 100644
--- a/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
+++ b/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
@@ -20,24 +20,25 @@
*/
package org.apache.qpid.IBMPerfTest;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.util.Hashtable;
import java.io.File;
-import java.net.MalformedURLException;
+import java.util.Hashtable;
public class JNDIBindTopic
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+{
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
@@ -99,11 +100,9 @@ public class JNDIBindTopic
}
catch (JMSException closeE)
{
-
+ System.out.println("Operation failed: " + closeE);
}
}
-
-
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
index ef00f0b9f2..727881de96 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
@@ -168,24 +168,6 @@ public class StreamMessageTest extends TestCase
}
}
- public void testWriteObjectThrowsNPE() throws Exception
- {
- try
- {
- JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
- bm.writeObject(null);
- fail("expected exception did not occur");
- }
- catch (NullPointerException n)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NullPointerException, got " + e);
- }
- }
-
public void testReadBoolean() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -221,9 +203,34 @@ public class StreamMessageTest extends TestCase
len = bm.readBytes(bytes);
assertEquals(-1, len);
len = bm.readBytes(bytes);
+ assertEquals(-1, len);
+ len = bm.readBytes(bytes);
assertEquals(0, len);
}
+ public void testReadBytesFollowedByPrimitive() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7, 8});
+ bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7});
+ bm.writeString("Foo");
+ bm.reset();
+ int len;
+ do
+ {
+ len = bm.readBytes(new byte[2]);
+ }
+ while (len == 2);
+
+ do
+ {
+ len = bm.readBytes(new byte[2]);
+ }
+ while (len == 2);
+
+ assertEquals("Foo", bm.readString());
+ }
+
public void testReadMultipleByteArrays() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -577,11 +584,11 @@ public class StreamMessageTest extends TestCase
bm = TestMessageHelper.newJMSStreamMessage();
bm.writeString("2");
bm.reset();
- assertEquals((byte)2, bm.readByte());
+ assertEquals((byte)2, bm.readByte());
bm.reset();
assertEquals((short)2, bm.readShort());
bm.reset();
- assertEquals((int)2, bm.readInt());
+ assertEquals(2, bm.readInt());
bm.reset();
assertEquals((long)2, bm.readLong());
bm = TestMessageHelper.newJMSStreamMessage();
@@ -592,6 +599,16 @@ public class StreamMessageTest extends TestCase
assertEquals(5.7d, bm.readDouble());
}
+ public void testNulls() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString(null);
+ bm.writeObject(null);
+ bm.reset();
+ assertNull(bm.readObject());
+ assertNull(bm.readObject());
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(StreamMessageTest.class);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index b9aee2087e..6213a9d318 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -94,25 +94,45 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
{
- routeAndTest(m, Arrays.asList(expected));
+ routeAndTest(m, false, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, expectReturn, Arrays.asList(expected));
}
protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
{
- route(m);
- for (TestQueue q : queues)
+ routeAndTest(m, false, expected);
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
+ {
+ try
{
- if (expected.contains(q))
+ route(m);
+ assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ for (TestQueue q : queues)
{
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
}
}
+
+ catch (NoRouteException ex)
+ {
+ assertTrue("Expected "+m+" not to be returned",expectReturn);
+ }
+
}
static FieldTable getHeaders(String... entries)
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 91520df3bf..c220442a78 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
@@ -52,6 +53,19 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
routeAndTest(new Message("Message6", "F0002"));
+
+ Message m7 = new Message("Message7", "XXXXX");
+
+ BasicPublishBody pb7 = m7.getPublishBody();
+ pb7.mandatory = true;
+ routeAndTest(m7,true);
+
+ Message m8 = new Message("Message8", "F0000");
+ BasicPublishBody pb8 = m8.getPublishBody();
+ pb8.mandatory = true;
+ routeAndTest(m8,false,q1);
+
+
}
public void testAny() throws AMQException
@@ -71,6 +85,20 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
routeAndTest(new Message("Message6", "F0002"));
}
+ public void testMandatory() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ Message m1 = new Message("Message1", "XXXXX");
+ Message m2 = new Message("Message2", "F0000");
+ BasicPublishBody pb1 = m1.getPublishBody();
+ pb1.mandatory = true;
+ BasicPublishBody pb2 = m1.getPublishBody();
+ pb2.mandatory = true;
+ routeAndTest(m1,true);
+
+
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(HeadersExchangeTest.class);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
new file mode 100644
index 0000000000..4dffe3e75f
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.client.*;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+
+public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+ private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+// DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * Tests that mandatory message which are not routable are returned to the producer
+ *
+ * @throws Exception
+ */
+ public void testReturnUnroutableMandatoryMessage() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ FieldTable ft = new PropertyFieldTable();
+ ft.setString("F1000","1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
+
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+
+ // First test - should neither be bounced nor routed
+ _logger.info("Sending non-routable non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ // Third test - should be routed
+ _logger.info("Sending routable message");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000","1");
+ mandatoryProducer.send(msg3);
+
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver",tm != null);
+ assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch(InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
+
+
+
+
+ con.close();
+ con2.close();
+
+
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ _logger.warn("Caught exception on producer: ",jmsException);
+ Exception linkedException = jmsException.getLinkedException();
+ if(linkedException instanceof AMQNoRouteException)
+ {
+ AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
+ Message bounced = (Message) noRoute.getUndeliveredMessage();
+ _bouncedMessageList.add(bounced);
+ }
+ }
+}