summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java309
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java301
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java67
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java84
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java238
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/Message.java28
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Config.java (renamed from java/client/src/test/java/org/apache/qpid/transacted/Config.java)0
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java (renamed from java/client/src/test/java/org/apache/qpid/transacted/Ping.java)0
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java (renamed from java/client/src/test/java/org/apache/qpid/transacted/Pong.java)0
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java (renamed from java/client/src/test/java/org/apache/qpid/transacted/Relay.java)0
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/transacted/Start.java (renamed from java/client/src/test/java/org/apache/qpid/transacted/Start.java)0
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java (renamed from java/client/src/test/java/org/apache/qpid/weblogic/ServiceProvider.java)0
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java (renamed from java/client/src/test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java)0
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java89
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java39
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java67
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java33
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java251
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java80
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java199
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (renamed from java/client/src/old_test/java/org/apache/qpid/transacted/TransactedTest.java)2
41 files changed, 1717 insertions, 481 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 5c13e7861f..c6f3f9c492 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,13 +79,19 @@ public abstract class AMQDestination implements Destination, Referenceable
protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
boolean isAutoDelete, String queueName)
{
+ this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
+ }
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
+ boolean isAutoDelete, String queueName, boolean isDurable)
+ {
if (destinationName == null)
{
- throw new IllegalArgumentException("Destination name must not be null");
+ throw new IllegalArgumentException("Destination exchange must not be null");
}
if (exchangeName == null)
{
- throw new IllegalArgumentException("Exchange name must not be null");
+ throw new IllegalArgumentException("Exchange exchange must not be null");
}
if (exchangeClass == null)
{
@@ -97,6 +103,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_isExclusive = isExclusive;
_isAutoDelete = isAutoDelete;
_queueName = queueName;
+ _isDurable = isDurable;
}
public String getEncodedName()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8e93b19eea..21fc5c89c5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -23,13 +23,15 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.server.handler.ExchangeBoundHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
@@ -37,6 +39,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
@@ -46,6 +49,8 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -66,6 +71,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
+ */
+ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+ private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
+
+ /**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
* feature.
*/
@@ -91,6 +107,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private Map _consumers = new ConcurrentHashMap();
/**
+ * Maps from destination to count of JMSMessageConsumers
+ */
+ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+ new ConcurrentHashMap<Destination, AtomicInteger>();
+
+ /**
* Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
* need to be attached to a queue
*/
@@ -116,6 +138,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
+ private final AtomicLong _lastDeliveryTag = new AtomicLong();
+
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -154,10 +179,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
_logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Consumers that exist: " + _consumers);
+ _logger.warn("Session hashcode: " + System.identityHashCode(this));
}
else
{
+
consumer.notifyMessage(message, _channelId);
+
}
}
else
@@ -673,6 +702,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
}
+
+ public void acknowledge() throws JMSException
+ {
+ if (getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+ if (isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Session is already closed");
+ }
+ acknowledgeMessage(_lastDeliveryTag.get(), true);
+
+ }
+
+ void setLastDeliveredMessage(AbstractJMSMessage message)
+ {
+ _lastDeliveryTag.set(message.getDeliveryTag());
+ }
+
+
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
@@ -775,20 +825,38 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ false,
+ false,
+ null,
+ null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ false,
+ false,
+ messageSelector,
+ null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -798,7 +866,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -810,7 +878,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -846,6 +914,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final String selector,
final FieldTable rawSelector) throws JMSException
{
+ checkTemporaryDestination(destination);
+
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
public Object operation() throws JMSException
@@ -870,7 +940,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- registerConsumer(consumer);
+ registerConsumer(consumer, false);
}
catch (AMQException e)
{
@@ -879,11 +949,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw ex;
}
+ synchronized(destination)
+ {
+ _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.get(destination).incrementAndGet();
+ }
+
return consumer;
}
}.execute(_connection);
}
+ private void checkTemporaryDestination(Destination destination)
+ throws JMSException
+ {
+ if((destination instanceof TemporaryDestination))
+ {
+ _logger.debug("destination is temporary");
+ final TemporaryDestination tempDest = (TemporaryDestination) destination;
+ if(tempDest.getSession() != this)
+ {
+ _logger.debug("destination is on different session");
+ throw new JMSException("Cannot consume from a temporary destination created onanother session");
+ }
+ if(tempDest.isDeleted())
+ {
+ _logger.debug("destination is deleted");
+ throw new JMSException("Cannot consume from a deleted destination");
+ }
+ }
+ }
+
+
+ public boolean hasConsumer(Destination destination)
+ {
+ AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+ return (counter != null) && (counter.get() != 0);
+ }
+
+
public void declareExchange(String name, String type)
{
declareExchange(name, type, _connection.getProtocolHandler());
@@ -947,20 +1052,38 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName
* @return the consumer tag generated by the broker
*/
- private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
- boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+ private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, noLocal,
- acknowledgeMode == Session.NO_ACKNOWLEDGE,
- exclusive, true);
+ consumer.setConsumerTag(tag);
+ // we must register the consumer in the map before we actually start listening
+ _consumers.put(tag, consumer);
- protocolHandler.writeFrame(jmsConsume);
- return tag;
+ try
+ {
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
+ queueName, tag, consumer.isNoLocal(),
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+ consumer.isExclusive(), nowait, null);
+ if (nowait)
+ {
+ protocolHandler.writeFrame(jmsConsume);
+ }
+ else
+ {
+ protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ }
+ }
+ catch (AMQException e)
+ {
+ // clean-up the map in the event of an error
+ _consumers.remove(tag);
+ throw e;
+ }
}
public Queue createQueue(String queueName) throws JMSException
@@ -1081,19 +1204,55 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- * If a name is reused in creating a new subscriber with a different topic/selecto or no-local
- * flag then the subcriber will receive messages matching the old subscription AND the new one.
- * The spec states that the new one should replace the old one.
- * TODO: fix it.
- */
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " +
+ name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getQueueName()) &&
+ !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+ {
+ deleteQueue(dest.getQueueName());
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+ _subscriptions.put(name,subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+
+ return subscriber;
+ }
+
+ void deleteQueue(String queueName) throws JMSException
+ {
+ try
+ {
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
+ false, true);
+ _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
/**
@@ -1104,9 +1263,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- return new TopicSubscriberAdaptor(dest, consumer);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name,subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ return subscriber;
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
@@ -1132,32 +1294,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TemporaryQueue createTemporaryQueue() throws JMSException
{
checkNotClosed();
- return new AMQTemporaryQueue();
+ return new AMQTemporaryQueue(this);
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
checkNotClosed();
- return new AMQTemporaryTopic();
+ return new AMQTemporaryTopic(this);
}
public void unsubscribe(String name) throws JMSException
{
checkNotClosed();
-
- String queue = _connection.getClientID() + ":" + name;
-
- AMQFrame queueDeclareFrame = QueueDeclareBody.createAMQFrame(_channelId,0,queue,true,false, false, false, true, null);
-
- try {
- AMQMethodEvent event = _connection.getProtocolHandler().syncWrite(queueDeclareFrame,QueueDeclareOkBody.class);
- // if this method doen't throw an exception means we have received a queue declare ok.
- } catch (AMQException e) {
- throw new javax.jms.InvalidDestinationException("This destination doesn't exist");
- }
- //send a queue.delete for the subscription
- AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
- _connection.getProtocolHandler().writeFrame(frame);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ // send a queue.delete for the subscription
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ _subscriptions.remove(name);
+ _reverseSubscriptionMap.remove(subscriber);
+ }
+ else
+ {
+ if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
+ {
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ }
+ else
+ {
+ throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+ }
+ }
+ }
+
+ boolean isQueueBound(String queueName) throws JMSException
+ {
+ return isQueueBound(queueName, null);
+ }
+
+ boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ {
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
+ routingKey, queueName);
+ AMQMethodEvent response = null;
+ try
+ {
+ response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+ return (responseBody.replyCode == ExchangeBoundHandler.OK);
}
private void checkTransacted() throws JMSException
@@ -1263,7 +1452,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param consumer
* @throws AMQException
*/
- void registerConsumer(BasicMessageConsumer consumer) throws AMQException
+ void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
{
AMQDestination amqd = consumer.getDestination();
@@ -1275,22 +1464,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
- consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
-
- consumer.setConsumerTag(consumerTag);
- _consumers.put(consumerTag, consumer);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait);
}
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the
* map from consumerTag to consumer instance.
*
- * @param consumerTag the consumer tag, that was broker-generated
+ * @param consumer the consum
*/
- void deregisterConsumer(String consumerTag)
+ void deregisterConsumer(BasicMessageConsumer consumer)
{
- _consumers.remove(consumerTag);
+ _consumers.remove(consumer.getConsumerTag());
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if(subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
+
+ Destination dest = consumer.getDestination();
+ synchronized(dest)
+ {
+ if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
+ }
}
private void registerProducer(long producerId, MessageProducer producer)
@@ -1338,7 +1537,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator it = consumers.iterator(); it.hasNext();)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
- registerConsumer(consumer);
+ registerConsumer(consumer, true);
}
}
@@ -1359,12 +1558,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws InvalidDestinationException
+ private void checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
+ if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ {
+ throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
+ }
}
private void checkValidQueue(Queue queue) throws InvalidDestinationException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index 6b41ea0112..81fee69f90 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -26,25 +26,45 @@ import javax.jms.TemporaryQueue;
/**
* AMQ implementation of a TemporaryQueue.
*/
-final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
+final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
+ private final AMQSession _session;
+ private boolean _deleted;
+
/**
* Create a new instance of an AMQTemporaryQueue
*/
- public AMQTemporaryQueue()
+ public AMQTemporaryQueue(AMQSession session)
{
super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ _session = session;
}
/**
* @see javax.jms.TemporaryQueue#delete()
*/
- public void delete() throws JMSException
+ public synchronized void delete() throws JMSException
{
- throw new UnsupportedOperationException("Delete not supported, " +
- "will auto-delete when connection closed");
+ if(_session.hasConsumer(this))
+ {
+ throw new JMSException("Temporary Queue has consumers so cannot be deleted");
+ }
+ _deleted = true;
+
+ // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue. This is probably not
+ // quite right for JMSCompliance.
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
index 0ba5cb3c3a..241a9abc9b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
@@ -26,15 +26,18 @@ import javax.jms.TemporaryTopic;
/**
* AMQ implementation of TemporaryTopic.
*/
-class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
+class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination
{
+ private final AMQSession _session;
+ private boolean _deleted;
/**
* Create new temporary topic.
*/
- public AMQTemporaryTopic()
+ public AMQTemporaryTopic(AMQSession session)
{
super("TempQueue" + Long.toString(System.currentTimeMillis()));
+ _session = session;
}
/**
@@ -42,8 +45,25 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
*/
public void delete() throws JMSException
{
- throw new UnsupportedOperationException("Delete not supported, " +
- "will auto-delete when connection closed");
+ if(_session.hasConsumer(this))
+ {
+ throw new JMSException("Temporary Topic has consumers so cannot be deleted");
+ }
+
+ _deleted = true;
+ // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue. This is probably not
+ // quite right for JMSCompliance.
+ }
+
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 4dd38eea18..39304f3f4c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,20 +40,25 @@ public class AMQTopic extends AMQDestination implements Topic
public AMQTopic(String name)
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null);
- _isDurable = false;
+ this(name, true, null, false);
}
- /**
- * Constructor for use in creating a topic to represent a durable subscription
- * @param topic
- * @param clientId
- * @param subscriptionName
- */
- public AMQTopic(AMQTopic topic, String clientId, String subscriptionName)
+ public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ {
+ super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+ queueName, isDurable);
+ }
+
+ public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+ true);
+ }
+
+ public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName);
- _isDurable = true;
+ return connection.getClientID() + ":" + subscriptionName;
}
public String getTopicName() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4fb62b49fc..9f9038fddd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -214,10 +214,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//handle case where connection has already been started, and the dispatcher is blocked
//doing a put on the _synchronousQueue
- Object msg = _synchronousQueue.poll();
- if (msg != null)
+ AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+ if (jmsMsg != null)
{
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+ _session.setLastDeliveredMessage(jmsMsg);
messageListener.onMessage(jmsMsg);
postDeliver(jmsMsg);
}
@@ -280,7 +280,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
checkPreConditions();
-
+
acquireReceiving();
try
@@ -297,12 +297,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
catch (InterruptedException e)
{
+ _logger.warn("Interrupted: " + e, e);
return null;
}
finally
@@ -323,8 +326,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
finally
@@ -423,6 +428,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
//for an application to alter an installed listener while the session is started
+ _session.setLastDeliveredMessage(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -459,8 +465,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private void postDeliver(AbstractJMSMessage msg)
+ private void postDeliver(AbstractJMSMessage msg) throws JMSException
{
+ msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
@@ -522,7 +529,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(_consumerTag);
+ _session.deregisterConsumer(this);
}
public String getConsumerTag()
@@ -531,18 +538,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
public void setConsumerTag(String consumerTag)
- {
+ {
_consumerTag = consumerTag;
}
public AMQSession getSession() {
return _session;
}
-
+
private void checkPreConditions() throws JMSException{
-
+
this.checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 8c53d93de6..e11d70cf41 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -24,11 +24,13 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
+import java.util.Enumeration;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -140,7 +142,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDisableMessageID(boolean b) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
checkNotClosed();
// IGNORED
}
@@ -154,7 +156,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
_disableTimestamps = b;
}
@@ -166,11 +168,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setDeliveryMode(int i) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
+ " is illegal");
}
_deliveryMode = i;
}
@@ -183,7 +185,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setPriority(int i) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (i < 0 || i > 9)
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -199,7 +201,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void setTimeToLive(long l) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -227,33 +229,36 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
+
+
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
public void send(Message message, int deliveryMode) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
+
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
_mandatory, immediate);
}
}
@@ -261,23 +266,23 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
_immediate);
}
}
public void send(Destination destination, Message message) throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
@@ -286,12 +291,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
_mandatory, _immediate);
}
}
@@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, _immediate);
}
}
@@ -314,12 +319,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate);
}
}
@@ -329,27 +334,158 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
boolean immediate, boolean waitUntilSent)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate, waitUntilSent);
}
}
+
+ private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
+ {
+ if (message instanceof AbstractJMSMessage)
+ {
+ return (AbstractJMSMessage) message;
+ }
+ else
+ {
+ AbstractJMSMessage newMessage;
+
+ if (message instanceof BytesMessage)
+ {
+ BytesMessage bytesMessage = (BytesMessage) message;
+ bytesMessage.reset();
+
+ JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
+
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while ((len = bytesMessage.readBytes(buf)) != -1)
+ {
+ nativeMsg.writeBytes(buf, 0, len);
+ }
+
+ newMessage = nativeMsg;
+ }
+ else if (message instanceof MapMessage)
+ {
+ MapMessage origMessage = (MapMessage) message;
+ MapMessage nativeMessage = _session.createMapMessage();
+
+ Enumeration mapNames = origMessage.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ nativeMessage.setObject(name, origMessage.getObject(name));
+ }
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if (message instanceof ObjectMessage)
+ {
+ ObjectMessage origMessage = (ObjectMessage) message;
+ ObjectMessage nativeMessage = _session.createObjectMessage();
+
+ nativeMessage.setObject(origMessage.getObject());
+
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if (message instanceof TextMessage)
+ {
+ TextMessage origMessage = (TextMessage) message;
+ TextMessage nativeMessage = _session.createTextMessage();
+
+ nativeMessage.setText(origMessage.getText());
+
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if (message instanceof StreamMessage)
+ {
+ StreamMessage origMessage = (StreamMessage) message;
+ StreamMessage nativeMessage = _session.createStreamMessage();
+
+
+ try
+ {
+ origMessage.reset();
+ while (true)
+ {
+ nativeMessage.writeObject(origMessage.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ ;//
+ }
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else
+ {
+ newMessage = (AbstractJMSMessage) _session.createMessage();
+
+ }
+
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ newMessage.setObjectProperty(propertyName, value);
+ }
+ }
+
+ newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+
+ int priority = message.getJMSPriority();
+ if (priority < 0)
+ {
+ priority = 0;
+ }
+ else if (priority > 9)
+ {
+ priority = 9;
+ }
+
+ newMessage.setJMSPriority(priority);
+ if (message.getJMSReplyTo() != null)
+ {
+ newMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+ newMessage.setJMSType(message.getJMSType());
+
+
+ if (newMessage != null)
+ {
+ return newMessage;
+ }
+ else
+ {
+ throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+ }
+ }
+ }
+
+
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
+ (destination != null ? destination.getClass() : null));
}
- declareDestination((AMQDestination)destination);
+ declareDestination((AMQDestination) destination);
}
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+ protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
@@ -357,8 +493,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
/**
* The caller of this method must hold the failover mutex.
+ *
* @param destination
- * @param message
+ * @param origMessage
* @param deliveryMode
* @param priority
* @param timeToLive
@@ -366,9 +503,12 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @param immediate
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
+ checkTemporaryDestination(destination);
+
+ AbstractJMSMessage message = convertToNativeMessage(origMessage);
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
@@ -424,11 +564,42 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
_protocolHandler.writeFrame(compositeFrame, wait);
+
+
+ if (message != origMessage)
+ {
+ _logger.warn("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ }
+ }
+
+ private void checkTemporaryDestination(AMQDestination destination) throws JMSException
+ {
+ if(destination instanceof TemporaryDestination)
+ {
+ _logger.debug("destination is temporary destination");
+ TemporaryDestination tempDest = (TemporaryDestination) destination;
+ if(tempDest.getSession().isClosed())
+ {
+ _logger.debug("session is closed");
+ throw new JMSException("Session for temporary destination has been closed");
+ }
+ if(tempDest.isDeleted())
+ {
+ _logger.debug("destination is deleted");
+ throw new JMSException("Cannot send to a deleted temporary destination");
+ }
+ }
}
/**
* Create content bodies. This will split a large message into numerous bodies depending on the negotiated
* maximum frame size.
+ *
* @param payload
* @return the array of content bodies
*/
@@ -458,8 +629,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
for (int i = 0; i < bodies.length; i++)
{
bodies[i] = new ContentBody();
- payload.position((int)framePayloadMax * i);
- int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
+ payload.position((int) framePayloadMax * i);
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
bodies[i].payload = payload.slice();
remaining -= length;
@@ -480,32 +651,42 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_encoding = encoding;
}
- private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
- checkNotClosed();
+ private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+ {
+ checkNotClosed();
+
+ if (_session == null || _session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
+
+ private void checkInitialDestination()
+ {
+ if (_destination == null)
+ {
+ throw new UnsupportedOperationException("Destination is null");
+ }
+ }
- if(_session == null || _session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid Session");
- }
- }
+ private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+ {
+ if (_destination != null && suppliedDestination != null)
+ {
+ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ }
- private void checkInitialDestination(){
- if(_destination == null){
- throw new UnsupportedOperationException("Destination is null");
- }
- }
+ if (suppliedDestination == null)
+ {
+ throw new InvalidDestinationException("Supplied Destination was invalid");
+ }
- private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
- if (_destination != null && suppliedDestination != null){
- throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
- }
- if (suppliedDestination == null){
- throw new InvalidDestinationException("Supplied Destination was invalid");
- }
- }
+ }
- public AMQSession getSession() {
- return _session;
- }
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
new file mode 100644
index 0000000000..34ec49436e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSAMQException extends JMSException
+{
+ public JMSAMQException(AMQException s)
+ {
+ super(s.getMessage(), String.valueOf(s.getErrorCode()));
+ setLinkedException(s);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index f90cc97a80..c8de298ba1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -10,119 +10,124 @@ import javax.jms.QueueSender;
public class QueueSenderAdapter implements QueueSender {
- private MessageProducer delegate;
- private Queue queue;
+ private MessageProducer _delegate;
+ private Queue _queue;
private boolean closed = false;
public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){
- delegate = msgProducer;
- this.queue = queue;
+ _delegate = msgProducer;
+ _queue = queue;
}
public Queue getQueue() throws JMSException {
checkPreConditions();
- return queue;
+ return _queue;
}
public void send(Message msg) throws JMSException {
checkPreConditions();
- delegate.send(msg);
+ _delegate.send(msg);
}
public void send(Queue queue, Message msg) throws JMSException {
- checkPreConditions();
- delegate.send(queue, msg);
+ checkPreConditions(queue);
+ _delegate.send(queue, msg);
}
public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkPreConditions();
- delegate.send(msg, deliveryMode,priority,timeToLive);
+ _delegate.send(msg, deliveryMode,priority,timeToLive);
}
public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive)
throws JMSException {
- checkPreConditions();
- delegate.send(queue,msg, deliveryMode,priority,timeToLive);
+ checkPreConditions(queue);
+ _delegate.send(queue,msg, deliveryMode,priority,timeToLive);
}
public void close() throws JMSException {
- delegate.close();
+ _delegate.close();
closed = true;
}
public int getDeliveryMode() throws JMSException {
checkPreConditions();
- return delegate.getDeliveryMode();
+ return _delegate.getDeliveryMode();
}
public Destination getDestination() throws JMSException {
checkPreConditions();
- return delegate.getDestination();
+ return _delegate.getDestination();
}
public boolean getDisableMessageID() throws JMSException {
checkPreConditions();
- return delegate.getDisableMessageID();
+ return _delegate.getDisableMessageID();
}
public boolean getDisableMessageTimestamp() throws JMSException {
checkPreConditions();
- return delegate.getDisableMessageTimestamp();
+ return _delegate.getDisableMessageTimestamp();
}
public int getPriority() throws JMSException {
checkPreConditions();
- return delegate.getPriority();
+ return _delegate.getPriority();
}
public long getTimeToLive() throws JMSException {
checkPreConditions();
- return delegate.getTimeToLive();
+ return _delegate.getTimeToLive();
}
public void send(Destination dest, Message msg) throws JMSException {
- checkPreConditions();
- delegate.send(dest,msg);
+ checkPreConditions((Queue)dest);
+ _delegate.send(dest,msg);
}
public void send(Message msg, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkPreConditions();
- delegate.send(msg, deliveryMode,priority,timeToLive);
+ _delegate.send(msg, deliveryMode,priority,timeToLive);
}
public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
- checkPreConditions();
- delegate.send(dest,msg, deliveryMode,priority,timeToLive);
+ checkPreConditions((Queue)dest);
+ _delegate.send(dest,msg, deliveryMode,priority,timeToLive);
}
public void setDeliveryMode(int deliveryMode) throws JMSException {
checkPreConditions();
- delegate.setDeliveryMode(deliveryMode);
+ _delegate.setDeliveryMode(deliveryMode);
}
public void setDisableMessageID(boolean disableMessageID) throws JMSException {
checkPreConditions();
- delegate.setDisableMessageID(disableMessageID);
+ _delegate.setDisableMessageID(disableMessageID);
}
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
checkPreConditions();
- delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+ _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
}
public void setPriority(int priority) throws JMSException {
checkPreConditions();
- delegate.setPriority(priority);
+ _delegate.setPriority(priority);
}
public void setTimeToLive(long timeToLive) throws JMSException {
checkPreConditions();
- delegate.setTimeToLive(timeToLive);
+ _delegate.setTimeToLive(timeToLive);
}
-
- private void checkPreConditions() throws IllegalStateException, IllegalStateException {
+
+ private void checkPreConditions() throws IllegalStateException, IllegalStateException
+ {
+ checkPreConditions(_queue);
+ }
+
+ private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException {
if (closed){
throw new javax.jms.IllegalStateException("Publisher is closed");
}
@@ -131,7 +136,7 @@ public class QueueSenderAdapter implements QueueSender {
throw new UnsupportedOperationException("Queue is null");
}
- AMQSession session = ((BasicMessageProducer)delegate).getSession();
+ AMQSession session = ((BasicMessageProducer) _delegate).getSession();
if(session == null || session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
new file mode 100644
index 0000000000..8c11672a65
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
@@ -0,0 +1,17 @@
+package org.apache.qpid.client;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * so that operations related to their "temporary-ness" can be abstracted out.
+ */
+interface TemporaryDestination extends Destination
+{
+
+ public void delete() throws JMSException;
+ public AMQSession getSession();
+ public boolean isDeleted();
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 014c7c3311..dbc7b72813 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -35,10 +35,10 @@ import javax.jms.TopicSubscriber;
class TopicSubscriberAdaptor implements TopicSubscriber
{
private final Topic _topic;
- private final MessageConsumer _consumer;
+ private final BasicMessageConsumer _consumer;
private final boolean _noLocal;
- TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+ TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
{
_topic = topic;
_consumer = consumer;
@@ -119,4 +119,10 @@ class TopicSubscriberAdaptor implements TopicSubscriber
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
+
+ BasicMessageConsumer getMessageConsumer()
+ {
+ return _consumer;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
new file mode 100644
index 0000000000..858726745e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class);
+ private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
+
+ public static ExchangeBoundOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
+ body.replyText);
+ }
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
new file mode 100644
index 0000000000..3271a715a2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
+ private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+
+ public static QueueDeleteOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueDeleteOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ }
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index c1ed88b167..75e84fee96 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -32,16 +32,18 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.JmsNotImplementedException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.JMSPropertyFieldTable;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
-public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
+public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
@@ -50,7 +52,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
protected ByteBuffer _data;
private boolean _readableProperties = false;
private boolean _readableMessage = false;
-
+ private Destination _destination;
+
protected AbstractJMSMessage(ByteBuffer data)
{
super(new BasicContentHeaderProperties());
@@ -174,12 +177,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public Destination getJMSDestination() throws JMSException
{
// TODO: implement this once we have sorted out how to figure out the exchange class
- throw new JmsNotImplementedException();
+ return _destination;
}
public void setJMSDestination(Destination destination) throws JMSException
{
- throw new JmsNotImplementedException();
+ _destination = destination;
}
public int getJMSDeliveryMode() throws JMSException
@@ -234,7 +237,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public void clearProperties() throws JMSException
{
- getJmsContentHeaderProperties().getHeaders().clear();
+ getJmsContentHeaderProperties().getJMSHeaders().clear();
_readableProperties = false;
}
@@ -249,138 +252,139 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public boolean propertyExists(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName);
}
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
+
+ return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName);
}
public byte getByteProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getByte(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getShort(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getLong(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getString(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName);
}
public Object getObjectProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getObject(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName);
}
public Enumeration getPropertyNames() throws JMSException
{
- return getJmsContentHeaderProperties().getHeaders().getPropertyNames();
+ return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames();
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b);
+ getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b);
}
public void setByteProperty(String propertyName, byte b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b));
+ getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b));
}
public void setShortProperty(String propertyName, short i) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i));
+ getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i));
}
public void setIntProperty(String propertyName, int i) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i));
+ getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i));
}
public void setLongProperty(String propertyName, long l) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l));
+ getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l));
}
public void setFloatProperty(String propertyName, float f) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f));
+ getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f));
}
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v));
+ getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v));
}
public void setStringProperty(String propertyName, String value) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setString(propertyName, value);
+ getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value);
}
public void setObjectProperty(String propertyName, Object object) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
+ getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object);
}
protected void removeProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().remove(propertyName);
+ getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
}
- public void acknowledge() throws JMSException
+ public void acknowledgeThis() throws JMSException
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
@@ -397,6 +401,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
}
+ public void acknowledge() throws JMSException
+ {
+ if(_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
/**
* This forces concrete classes to implement clearBody()
@@ -426,13 +438,13 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
buf.append("\nAMQ message number: ").append(_deliveryTag);
buf.append("\nProperties:");
- if (getJmsContentHeaderProperties().getHeaders().isEmpty())
+ if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty())
{
buf.append("<NONE>");
}
else
{
- buf.append('\n').append(getJmsContentHeaderProperties().getHeaders());
+ buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders());
}
return buf.toString();
}
@@ -462,9 +474,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
{
throw new IllegalArgumentException("Property name must not be the empty string");
}
-
- // Call to ensure that the it has been set.
- getJmsContentHeaderProperties().getHeaders();
}
public BasicContentHeaderProperties getJmsContentHeaderProperties()
@@ -478,14 +487,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
// position beyond the start
if (_data != null)
{
- if (!_readableMessage)
- {
- _data.flip();
- }
- else
- {
- _data.rewind();
- }
+ reset();
}
return _data;
}
@@ -524,7 +526,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
return !_readableMessage;
}
- public void reset() throws JMSException
+ public void reset()
{
if (_readableMessage)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 456d4d520c..debabfd559 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -30,6 +30,9 @@ import javax.jms.MessageFormatException;
import javax.jms.MessageEOFException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CharsetDecoder;
+import java.nio.CharBuffer;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
@@ -149,10 +152,27 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
checkReadable();
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- checkAvailable(1);
+
try
{
- return _data.getString(Charset.forName("UTF-8").newDecoder());
+ short length = readShort();
+ if(length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+ ByteBuffer encodedString = _data.slice();
+ encodedString.limit(length);
+ _data.position(_data.position()+length);
+ CharBuffer string = decoder.decode(encodedString.buf());
+
+ return string.toString();
+ }
+
+
+
}
catch (CharacterCodingException e)
{
@@ -257,9 +277,15 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
checkWritable();
try
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+ java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+ _data.putShort((short)encodedString.limit());
+ _data.put(encodedString);
+
+ //_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
- _data.put((byte)0);
+ //_data.put((byte)0);
}
catch (CharacterCodingException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 85d434e4eb..f69bed0fc0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -25,6 +25,8 @@ import org.apache.qpid.framing.PropertyFieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.JMSPropertyFieldTable;
+import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -39,7 +41,7 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa
public static final String MIME_TYPE = "jms/map-message";
- private PropertyFieldTable _map;
+ private JMSPropertyFieldTable _properties;
JMSMapMessage() throws JMSException
{
@@ -49,10 +51,9 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- _map = new PropertyFieldTable();
+ _properties = new JMSPropertyFieldTable();
}
-
JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
throws AMQException
{
@@ -62,19 +63,33 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa
{
long tableSize = EncodingUtils.readInteger(_data);
- _map = (PropertyFieldTable) FieldTableFactory.newFieldTable(_data, tableSize);
-
+ try
+ {
+ _properties = new JMSPropertyFieldTable(_data, tableSize);
+ }
+ catch (JMSException e)
+ {
+ Exception error = e.getLinkedException();
+ if (error instanceof AMQFrameDecodingException)
+ {
+ throw(AMQFrameDecodingException) error;
+ }
+ else
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
+ }
}
else
{
- _map = (PropertyFieldTable) FieldTableFactory.newFieldTable();
+ _properties = new JMSPropertyFieldTable();
}
}
public String toBodyString() throws JMSException
{
- return "MapSize:" + _map.getEncodedSize() + "\nMapData:\n" + _map.toString();
+ return _properties.toString();
}
public String getMimeType()
@@ -82,85 +97,43 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa
return MIME_TYPE;
}
- // MapMessage Interface
- public boolean getBoolean(String string) throws JMSException
+ public ByteBuffer getData()
{
- Boolean b = _map.getBoolean(string);
-
- if (b == null)
- {
- if (_map.containsKey(string))
- {
- Object str = _map.getObject(string);
+ //What if _data is null?
+ _properties.writeToBuffer(_data);
+ return super.getData();
+ }
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
+ @Override
+ public void clearBodyImpl() throws JMSException
+ {
+ super.clearBodyImpl();
+ _properties.clear();
+ }
- return b;
+ public boolean getBoolean(String string) throws JMSException
+ {
+ return _properties.getBoolean(string);
}
public byte getByte(String string) throws JMSException
{
- Byte b = _map.getByte(string);
- if (b == null)
- {
- if (_map.containsKey(string))
- {
- Object str = _map.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getByte can't use " + string + " item.");
- }
- else
- {
- return Byte.valueOf((String) str);
- }
- }
- else
- {
- b = Byte.valueOf(null);
- }
- }
-
- return b;
+ return _properties.getByte(string);
}
public short getShort(String string) throws JMSException
{
- {
- Short s = _map.getShort(string);
-
- if (s == null)
- {
- s = Short.valueOf(getByte(string));
- }
-
- return s;
- }
+ return _properties.getShort(string);
}
public char getChar(String string) throws JMSException
{
-
- Character result = _map.getCharacter(string);
+ Character result = _properties.getCharacter(string);
if (result == null)
{
- throw new MessageFormatException("getChar couldn't find " + string + " item.");
+ throw new NullPointerException("getChar couldn't find " + string + " item.");
}
else
{
@@ -170,179 +143,97 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa
public int getInt(String string) throws JMSException
{
- Integer i = _map.getInteger(string);
-
- if (i == null)
- {
- i = Integer.valueOf(getShort(string));
- }
-
- return i;
+ return _properties.getInteger(string);
}
public long getLong(String string) throws JMSException
{
-
- Long l = _map.getLong(string);
-
- if (l == null)
- {
- l = Long.valueOf(getInt(string));
- }
-
- return l;
-
+ return _properties.getLong(string);
}
public float getFloat(String string) throws JMSException
{
-
- Float f = _map.getFloat(string);
-
- if (f == null)
- {
- if (_map.containsKey(string))
- {
- Object str = _map.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getFloat can't use " + string + " item.");
- }
- else
- {
- return Float.valueOf((String) str);
- }
- }
- else
- {
- f = Float.valueOf(null);
- }
-
- }
-
- return f;
-
+ return _properties.getFloat(string);
}
public double getDouble(String string) throws JMSException
{
- Double d = _map.getDouble(string);
-
- if (d == null)
- {
- d = Double.valueOf(getFloat(string));
- }
-
- return d;
+ return _properties.getDouble(string);
}
public String getString(String string) throws JMSException
{
- String s = _map.getString(string);
-
- if (s == null)
- {
- if (_map.containsKey(string))
- {
- Object o = _map.getObject(string);
- if (o instanceof byte[])
- {
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
- }
- else
- {
- if (o == null)
- {
- return null;
- }
- else
- {
- s = String.valueOf(o);
- }
- }
- }
- }
-
- return s;
+ return _properties.getString(string);
}
public byte[] getBytes(String string) throws JMSException
{
-
- byte[] result = _map.getBytes(string);
-
- if (result == null)
- {
- throw new MessageFormatException("getBytes couldn't find " + string + " item.");
- }
-
- return result;
-
+ return _properties.getBytes(string);
}
public Object getObject(String string) throws JMSException
{
- return _map.getObject(string);
+ return _properties.getObject(string);
}
public Enumeration getMapNames() throws JMSException
{
- return _map.getPropertyNames();
+ return _properties.getMapNames();
}
+
public void setBoolean(String string, boolean b) throws JMSException
{
checkWritable();
- _map.setBoolean(string, b);
+ _properties.setBoolean(string, b);
}
public void setByte(String string, byte b) throws JMSException
{
checkWritable();
- _map.setByte(string, b);
+ _properties.setByte(string, b);
}
public void setShort(String string, short i) throws JMSException
{
checkWritable();
- _map.setShort(string, i);
+ _properties.setShort(string, i);
}
public void setChar(String string, char c) throws JMSException
{
checkWritable();
- _map.setChar(string, c);
+ _properties.setChar(string, c);
}
public void setInt(String string, int i) throws JMSException
{
checkWritable();
- _map.setInteger(string, i);
+ _properties.setInteger(string, i);
}
public void setLong(String string, long l) throws JMSException
{
checkWritable();
- _map.setLong(string, l);
+ _properties.setLong(string, l);
}
public void setFloat(String string, float v) throws JMSException
{
checkWritable();
- _map.setFloat(string, v);
+ _properties.setFloat(string, v);
}
public void setDouble(String string, double v) throws JMSException
{
checkWritable();
- _map.setDouble(string, v);
+ _properties.setDouble(string, v);
}
public void setString(String string, String string1) throws JMSException
{
checkWritable();
- _map.setString(string, string1);
+ _properties.setString(string, string1);
}
public void setBytes(String string, byte[] bytes) throws JMSException
@@ -353,25 +244,18 @@ public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessa
public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException
{
checkWritable();
- _map.setBytes(string, bytes, i, i1);
+ _properties.setBytes(string, bytes, i, i1);
}
public void setObject(String string, Object object) throws JMSException
{
checkWritable();
- _map.setObject(string, object);
+ _properties.setObject(string, object);
}
public boolean itemExists(String string) throws JMSException
{
- return _map.itemExists(string);
- }
-
- public ByteBuffer getData()
- {
- //What if _data is null?
- _map.writeToBuffer(_data);
- return super.getData();
+ return _properties.itemExists(string);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 7393cea714..4fb070d2ff 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -72,6 +72,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data.release();
}
_data = null;
+
}
public String toBodyString() throws JMSException
@@ -97,6 +98,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.rewind();
}
+
try
{
ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
@@ -108,6 +110,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
throw new MessageFormatException("Message not serializable: " + e);
}
+
}
public Serializable getObject() throws JMSException
@@ -120,15 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
try
{
+ _data.rewind();
in = new ObjectInputStream(_data.asInputStream());
return (Serializable) in.readObject();
}
catch (IOException e)
- {
- throw new MessageFormatException("Could not deserialize message: " + e);
+ {
+ e.printStackTrace();
+ throw new MessageFormatException("Could not deserialize message: " + e);
}
catch (ClassNotFoundException e)
{
+ e.printStackTrace();
throw new MessageFormatException("Could not deserialize message: " + e);
}
finally
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index 04f3c5ee17..c2dfdc1b65 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -226,6 +226,10 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
byte wireType = readWireType();
try
{
+ if(wireType == NULL_STRING_TYPE){
+ throw new NullPointerException();
+ }
+
if (wireType != CHAR_TYPE)
{
_data.position(position);
@@ -428,7 +432,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
break;
case NULL_STRING_TYPE:
result = null;
- break;
+ throw new NullPointerException("data is null");
case BOOLEAN_TYPE:
checkAvailable(1);
result = String.valueOf(readBooleanImpl());
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index ab707bb51d..887850c06e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -104,6 +104,8 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+ frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java
new file mode 100644
index 0000000000..d73e51d755
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+public interface Message extends javax.jms.Message
+{
+ public void acknowledgeThis() throws JMSException;
+}
diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Config.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
index bd104e5407..bd104e5407 100644
--- a/java/client/src/test/java/org/apache/qpid/transacted/Config.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Ping.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
index e0af4422a6..e0af4422a6 100644
--- a/java/client/src/test/java/org/apache/qpid/transacted/Ping.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Pong.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
index 13295c137a..13295c137a 100644
--- a/java/client/src/test/java/org/apache/qpid/transacted/Pong.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Relay.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
index cede95e5f0..cede95e5f0 100644
--- a/java/client/src/test/java/org/apache/qpid/transacted/Relay.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
diff --git a/java/client/src/test/java/org/apache/qpid/transacted/Start.java b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
index 5564ed93ab..5564ed93ab 100644
--- a/java/client/src/test/java/org/apache/qpid/transacted/Start.java
+++ b/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
diff --git a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceProvider.java b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
index 71d806b338..71d806b338 100644
--- a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceProvider.java
+++ b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
diff --git a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
index a1e15258c3..a1e15258c3 100644
--- a/java/client/src/test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
+++ b/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index bd68f32c23..dc0ade76c4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -36,18 +36,21 @@ public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
- static
+ protected void setUp() throws Exception
{
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
- DOMConfigurator.configure("../broker/etc/log4j.xml");
+ super.setUp();
+ TransportConnection.createVMBroker(1);
}
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ //Thread.sleep(2000);
+ }
+
+
+
public void testRecoverResendsMsgs() throws Exception
{
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
@@ -104,8 +107,74 @@ public class RecoverTest extends TestCase
con.close();
}
+
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ TextMessage tm2 = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
+
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. Calling recover with two outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
+ consumerSession.recover();
+ TextMessage tm3 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg3", tm3.getText());
+
+ TextMessage tm4 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm4.getText());
+
+
+ _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
+ ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+
+ _logger.info("Calling recover");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+ tm4 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm4.getText());
+ ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+
+ _logger.info("Calling recover");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ con.close();
+ }
+
+
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(RecoverTest.class));
+ return new junit.framework.TestSuite(RecoverTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
index 3fb1dd3f7d..f4efd64dbb 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.FieldTableTest;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.testutil.VMBrokerSetup;
@@ -134,7 +133,11 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
{
ByteBuffer buffer = ((JMSBytesMessage) m).getData();
FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
- new FieldTableTest().assertEquivalent(_expected, actual);
+ for (Object o : _expected.keySet())
+ {
+ String key = (String) o;
+ assertEquals("Values for " + key + " did not match", _expected.get(key), actual.get(key));
+ }
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index 1a469c1d12..02a98f67d9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -20,20 +20,19 @@
*/
package org.apache.qpid.test.unit.basic;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
-import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import javax.jms.*;
-
-import junit.framework.TestCase;
-import junit.framework.Assert;
public class MapMessageTest extends TestCase implements MessageListener
{
@@ -56,7 +55,7 @@ public class MapMessageTest extends TestCase implements MessageListener
super.setUp();
try
{
- //TransportConnection.createVMBroker(1);
+ TransportConnection.createVMBroker(1);
init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
catch (Exception e)
@@ -69,7 +68,7 @@ public class MapMessageTest extends TestCase implements MessageListener
{
_logger.info("Tearing Down unit.basic.MapMessageTest");
super.tearDown();
- //TransportConnection.killAllVMBrokers();
+ TransportConnection.killAllVMBrokers();
}
private void init(AMQConnection connection) throws Exception
@@ -166,12 +165,12 @@ public class MapMessageTest extends TestCase implements MessageListener
testMapValues(m, count);
+ testCorrectExceptions(m);
+
testMessageWriteStatus(m);
testPropertyWriteStatus(m);
- testCorrectExceptions(m);
-
count++;
}
}
@@ -230,7 +229,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("message");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -320,7 +319,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("short");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -403,7 +402,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("long");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -494,7 +493,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("double");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -587,7 +586,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("float");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -671,7 +670,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("int");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -843,7 +842,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("bytes");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -928,7 +927,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("byte");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -1005,7 +1004,7 @@ public class MapMessageTest extends TestCase implements MessageListener
m.getChar("odd");
fail("Exception Expected.");
}
- catch (MessageFormatException nfe)
+ catch (MessageFormatException npe)
{
//normal execution
}
@@ -1249,6 +1248,6 @@ public class MapMessageTest extends TestCase implements MessageListener
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(MapMessageTest.class));
+ return new junit.framework.TestSuite(MapMessageTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
index 80af81652e..c88024f39f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,15 @@
*/
package org.apache.qpid.test.unit.basic;
+import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.testutil.VMBrokerSetup;
import javax.jms.*;
-import junit.framework.TestCase;
-
public class MultipleConnectionTest extends TestCase
{
public static final String _defaultBroker = "vm://:1";
@@ -138,6 +135,19 @@ public class MultipleConnectionTest extends TestCase
}
}
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
{
for (int i = 0; i < receivers.length; i++)
@@ -209,6 +219,6 @@ public class MultipleConnectionTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(MultipleConnectionTest.class));
+ return new junit.framework.TestSuite(MultipleConnectionTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
new file mode 100644
index 0000000000..f4814795c4
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class PubSubTwoConnectionTest extends TestCase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * This tests that a consumer is set up synchronously
+ * @throws Exception
+ */
+ public void testTwoConnections() throws Exception
+ {
+ Topic topic = new AMQTopic("MyTopic");
+ Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "/test_path");
+ Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageProducer producer = session1.createProducer(topic);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "/test_path");
+ Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+ producer.send(session1.createTextMessage("Hello"));
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("Hello", tm1.getText());
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index 04ad15da7a..903f6a9da9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -21,11 +21,8 @@
package org.apache.qpid.test.unit.basic;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.testutil.VMBrokerSetup;
import org.apache.log4j.Logger;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
index 7ffb3ca469..a0e4aa9787 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
@@ -552,16 +552,6 @@ public class BytesMessageTest extends TestCase
assertEquals((byte)0, result[2]);
}
- public void testToBodyString() throws Exception
- {
- JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
- final String testText = "This is a test";
- bm.writeUTF(testText);
- bm.reset();
- String result = bm.toBodyString();
- assertEquals(testText, result);
- }
-
public void testToBodyStringWithNull() throws Exception
{
JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
index e5458fd89e..bd4b3b3987 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
@@ -14,18 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.test.unit.client.message;
-import junit.framework.TestCase;
import junit.framework.Assert;
-import org.apache.qpid.framing.PropertyFieldTable;
+import junit.framework.TestCase;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.TestMessageHelper;
-import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -104,6 +102,27 @@ public class MapMessageTest extends TestCase
{
Assert.fail("JMSException received." + e);
}
+
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+
+ mm.setString("value", null);
+ char c = mm.getChar("value");
+ fail("Expected NullPointerException");
+
+ }
+ catch (NullPointerException e)
+ {
+ ; // pass
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+
+
+
}
public void testDoubleLookup()
@@ -204,7 +223,7 @@ public class MapMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getByte("random");
- Assert.fail("MessageFormatException expected");
+ Assert.fail("NumberFormatException expected");
}
catch (NumberFormatException e)
{
@@ -331,7 +350,7 @@ public class MapMessageTest extends TestCase
{
JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
mm.getShort("random");
- Assert.fail("NumberFormatException should be received.");
+ Assert.fail("NumberFormatException should be received.");
}
catch (NumberFormatException e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
index 07eedc8bb9..bbd1870168 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -167,6 +167,12 @@ public class ObjectMessageTest extends TestCase implements MessageListener
}
+ public void testSetObjectForNull() throws Exception
+ {
+ ObjectMessage msg = session.createObjectMessage();
+ msg.setObject(null);
+ assertNull(msg.getObject());
+ }
private void send() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
index 337b0f3bbc..64d10fb13f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,8 +22,13 @@ package org.apache.qpid.test.unit.client.message;
import org.apache.qpid.client.message.TestMessageHelper;
import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
import junit.framework.TestCase;
+import junit.framework.Assert;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
public class TextMessageTest extends TestCase
{
@@ -47,6 +52,248 @@ public class TextMessageTest extends TestCase
assertEquals(val, "Banana");
}
+
+ public void testBooleanPropertyLookup()
+ {
+ try
+ {
+ JMSTextMessage tm = TestMessageHelper.newJMSTextMessage();
+
+ tm.setBooleanProperty("value", true);
+ Assert.assertEquals(true, tm.getBooleanProperty("value"));
+ Assert.assertEquals("true", tm.getStringProperty("value"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testBytePropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.setByteProperty("value", Byte.MAX_VALUE);
+
+ Assert.assertEquals(Byte.MAX_VALUE, mm.getByteProperty("value"));
+ Assert.assertEquals((short) Byte.MAX_VALUE, mm.getShortProperty("value"));
+ Assert.assertEquals(Byte.MAX_VALUE, mm.getIntProperty("value"));
+ Assert.assertEquals((long) Byte.MAX_VALUE, mm.getLongProperty("value"));
+ Assert.assertEquals("" + Byte.MAX_VALUE, mm.getStringProperty("value"));
+
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testShortPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.setShortProperty("value", Short.MAX_VALUE);
+ Assert.assertEquals(Short.MAX_VALUE, mm.getShortProperty("value"));
+ Assert.assertEquals((int) Short.MAX_VALUE, mm.getIntProperty("value"));
+ Assert.assertEquals((long) Short.MAX_VALUE, mm.getLongProperty("value"));
+ Assert.assertEquals("" + Short.MAX_VALUE, mm.getStringProperty("value"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testDoublePropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.setDoubleProperty("value", Double.MAX_VALUE);
+ Assert.assertEquals(Double.MAX_VALUE, mm.getDoubleProperty("value"));
+ Assert.assertEquals("" + Double.MAX_VALUE, mm.getStringProperty("value"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testFloatPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.setFloatProperty("value", Float.MAX_VALUE);
+ Assert.assertEquals(Float.MAX_VALUE, mm.getFloatProperty("value"));
+ Assert.assertEquals((double) Float.MAX_VALUE, mm.getDoubleProperty("value"));
+ Assert.assertEquals("" + Float.MAX_VALUE, mm.getStringProperty("value"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testIntPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.setIntProperty("value", Integer.MAX_VALUE);
+ Assert.assertEquals(Integer.MAX_VALUE, mm.getIntProperty("value"));
+ Assert.assertEquals((long) Integer.MAX_VALUE, mm.getLongProperty("value"));
+ Assert.assertEquals("" + Integer.MAX_VALUE, mm.getStringProperty("value"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testLongPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.setLongProperty("value", Long.MAX_VALUE);
+ Assert.assertEquals(Long.MAX_VALUE, mm.getLongProperty("value"));
+ Assert.assertEquals("" + Long.MAX_VALUE, mm.getStringProperty("value"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+
+ // Failed Lookups
+
+ public void testFailedBooleanPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ Assert.assertEquals(false, mm.getBooleanProperty("int"));
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received." + e);
+ }
+ }
+
+ public void testFailedBytePropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.getByteProperty("random");
+ Assert.fail("NumberFormatException expected");
+ }
+ catch (NumberFormatException e)
+ {
+ //normal execution
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received:" + e);
+ }
+
+ }
+
+ public void testFailedDoublePropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.getDoubleProperty("random");
+ Assert.fail("NullPointerException should be received.");
+ }
+ catch (NullPointerException e)
+ {
+ //normal execution
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received:" + e);
+ }
+ }
+
+ public void testFailedFloatPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.getFloatProperty("random");
+ Assert.fail("NullPointerException should be received.");
+ }
+ catch (NullPointerException e)
+ {
+ //normal execution
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received:" + e);
+ }
+ }
+
+ public void testFailedIntPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.getIntProperty("random");
+ Assert.fail("NumberFormatException should be received.");
+ }
+ catch (NumberFormatException e)
+ {
+ //normal execution
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received:" + e);
+ }
+ }
+
+ public void testFailedLongPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.getLongProperty("random");
+ Assert.fail("NumberFormatException should be received.");
+ }
+ catch (NumberFormatException e)
+ {
+ //normal execution
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received:" + e);
+ }
+ }
+
+ public void testFailedShortPropertyLookup()
+ {
+ try
+ {
+ JMSMapMessage mm = TestMessageHelper.newJMSMapMessage();
+ mm.getShortProperty("random");
+ Assert.fail("NumberFormatException should be received.");
+ }
+ catch (NumberFormatException e)
+ {
+ //normal execution
+ }
+ catch (JMSException e)
+ {
+ Assert.fail("JMSException received:" + e);
+ }
+ }
+
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(TextMessageTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
new file mode 100644
index 0000000000..1e13962f01
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -0,0 +1,80 @@
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+
+public class TemporaryQueueTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
+ protected Connection createConnection() throws AMQException, URLSyntaxException
+ {
+ return new AMQConnection(_broker, "guest", "guest",
+ "fred", "/test");
+ }
+
+ public void testTempoaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello",tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch(JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch(JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 50944730c3..315ba6ae4c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -73,7 +73,7 @@ public class StreamMessageTest extends TestCase
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
// Third test - should be routed
- _logger.info("Sending routable message");
+ _logger.info("Sending isBound message");
StreamMessage msg = producerSession.createStreamMessage();
msg.setStringProperty("F1000","1");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index fa46a4bcfb..14ceaa75f1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -26,10 +26,8 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
-import javax.jms.TopicSession;
-import javax.jms.TextMessage;
-import javax.jms.TopicPublisher;
-import javax.jms.MessageConsumer;
+import javax.jms.*;
+
/**
* @author Apache Software Foundation
@@ -46,12 +44,126 @@ public class TopicSessionTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
+ //Thread.sleep(2000);
}
- public void testTextMessageCreation() throws Exception
+
+ public void testTopicSubscriptionUnsubscription() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ con.start();
+
+ TextMessage tm = session1.createTextMessage("Hello");
+ publisher.publish(tm);
+
+ tm = (TextMessage) sub.receive(2000);
+ assertNotNull(tm);
+
+ session1.unsubscribe("subscription0");
+
+ try
+ {
+ session1.unsubscribe("not a subscription");
+ fail("expected InvalidDestinationException when unsubscribing from unknown subscription");
+ }
+ catch(InvalidDestinationException e)
+ {
+ ; // PASS
+ }
+ catch(Exception e)
+ {
+ fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e);
+ }
+
+ con.close();
+ }
+
+ public void testSubscriptionNameReuseForDifferentTopicSingleConnection() throws Exception
+ {
+ subscriptionNameReuseForDifferentTopic(false);
+ }
+
+ public void testSubscriptionNameReuseForDifferentTopicTwoConnections() throws Exception
+ {
+ subscriptionNameReuseForDifferentTopic(true);
+ }
+
+ private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
+ AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
+ TopicPublisher publisher = session1.createPublisher(null);
+
+ con.start();
+
+ publisher.publish(topic, session1.createTextMessage("hello"));
+ TextMessage m = (TextMessage) sub.receive(2000);
+ assertNotNull(m);
+
+ if (shutdown)
+ {
+ session1.close();
+ con.close();
+ con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ con.start();
+ session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ publisher = session1.createPublisher(null);
+ }
+ TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
+ publisher.publish(topic, session1.createTextMessage("hello"));
+ if (!shutdown)
+ {
+ m = (TextMessage) sub.receive(2000);
+ assertNull(m);
+ }
+ publisher.publish(topic2, session1.createTextMessage("goodbye"));
+ m = (TextMessage) sub2.receive(2000);
+ assertNotNull(m);
+ assertEquals("goodbye", m.getText());
+ con.close();
+ }
+
+ public void testUnsubscriptionAfterConnectionClose() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic3");
+ AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
+
+ con2.start();
+
+ publisher.publish(session1.createTextMessage("Hello"));
+ TextMessage tm = (TextMessage) sub.receive(2000);
+ assertNotNull(tm);
+ con2.close();
+ publisher.publish(session1.createTextMessage("Hello2"));
+ con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ sub = session2.createDurableSubscriber(topic, "subscription0");
+ con2.start();
+ tm = (TextMessage) sub.receive(2000);
+ assertNotNull(tm);
+ assertEquals("Hello2", tm.getText());
+ con1.close();
+ con2.close();
+ }
+
+ public void testTextMessageCreation() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic4");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -85,8 +197,83 @@ public class TopicSessionTest extends TestCase
tm = (TextMessage) consumer1.receive(2000);
assertNotNull(tm);
assertEquals("Empty string not returned", "", msgText);
+ con.close();
}
+ public void testSendingSameMessage() throws Exception
+ {
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic topic = session.createTemporaryTopic();
+ assertNotNull(topic);
+ TopicPublisher producer = session.createPublisher(topic);
+ MessageConsumer consumer = session.createConsumer(topic);
+ conn.start();
+ TextMessage sentMessage = session.createTextMessage("Test Message");
+ producer.send(sentMessage);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
+ assertNotNull(receivedMessage);
+ assertEquals(sentMessage.getText(),receivedMessage.getText());
+ producer.send(sentMessage);
+ receivedMessage = (TextMessage) consumer.receive(2000);
+ assertNotNull(receivedMessage);
+ assertEquals(sentMessage.getText(),receivedMessage.getText());
+
+
+ }
+
+ public void testTemporaryTopic() throws Exception
+ {
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic topic = session.createTemporaryTopic();
+ assertNotNull(topic);
+ TopicPublisher producer = session.createPublisher(topic);
+ MessageConsumer consumer = session.createConsumer(topic);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello",tm.getText());
+
+ try
+ {
+ topic.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch(JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ topic.delete();
+ }
+ catch(JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ MessageConsumer consumer2 = session2.createConsumer(topic);
+ fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
+ }
+ catch (JMSException je)
+ {
+ ; // pass
+ }
+
+
+
+ conn.close();
+ }
+
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(TopicSessionTest.class);
diff --git a/java/client/src/old_test/java/org/apache/qpid/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index e858e1ad36..90a11307b8 100644
--- a/java/client/src/old_test/java/org/apache/qpid/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -23,8 +23,6 @@ package org.apache.qpid.test.unit.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.testutil.VMBrokerSetup;
import javax.jms.*;