summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-06-26 16:43:58 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-06-26 16:43:58 +0000
commitd2a9e42e20edbfd0db53c75a4f0511547ec70319 (patch)
tree7656b45795eb1aa79d36596090e64bb092d7a354
parent37354109282201cc38b2734ab3b33fb5eeafca5c (diff)
downloadqpid-python-d2a9e42e20edbfd0db53c75a4f0511547ec70319.tar.gz
QPID-509 Mandatory messages not returned outside a transaction. They are now.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@550849 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java307
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java102
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java74
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java139
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java24
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java141
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java134
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java27
10 files changed, 514 insertions, 438 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 52e9505a32..77546d3134 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,17 +20,8 @@
*/
package org.apache.qpid.server;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -53,6 +44,16 @@ import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -61,7 +62,7 @@ public class AMQChannel
private final int _channelId;
- //private boolean _transactional;
+ // private boolean _transactional;
private long _prefetch_HighWaterMark;
@@ -113,13 +114,12 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
- //Why do we need this reference ? - ritchiem
+ // Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
private boolean _closing;
-
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws AMQException
{
_session = session;
_channelId = channelId;
@@ -166,7 +166,6 @@ public class AMQChannel
return _prefetchSize;
}
-
public void setPrefetchSize(long prefetchSize)
{
_prefetchSize = prefetchSize;
@@ -192,18 +191,15 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
-
public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
{
-
- _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
- _txnContext);
+ _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
_currentMessage.setPublisher(publisher);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+ throws AMQException
{
if (_currentMessage == null)
{
@@ -215,6 +211,7 @@ public class AMQChannel
{
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
+
_currentMessage.setContentHeaderBody(contentHeaderBody);
_currentMessage.setExpiration();
@@ -224,13 +221,13 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
{
+ _txnContext.messageProcessed(protocolSession);
_currentMessage = null;
}
}
}
- public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
- throws AMQException
+ public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException
{
if (_currentMessage == null)
{
@@ -241,12 +238,15 @@ public class AMQChannel
{
_log.trace(debugIdentity() + "Content body received on channel " + _channelId);
}
+
try
{
// returns true iff the message was delivered (i.e. if all data was
// received
- if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody)))
+ if (_currentMessage.addContentBodyFrame(_storeContext,
+ protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+ contentBody)))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
@@ -303,12 +303,13 @@ public class AMQChannel
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
+
if (_consumerTag2QueueMap.containsKey(tag))
{
throw new ConsumerTagNotUniqueException();
@@ -316,29 +317,28 @@ public class AMQChannel
queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
+
return tag;
}
-
public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
{
if (_log.isDebugEnabled())
{
_log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- _log.debug(message);
- return true;
- }
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(message);
- public void visitComplete()
- {
- }
- });
+ return true;
+ }
+
+ public void visitComplete()
+ { }
+ });
}
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
@@ -382,14 +382,17 @@ public class AMQChannel
_log.info("No consumers to unsubscribe on channel " + toString());
}
}
+
for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
{
if (_log.isInfoEnabled())
{
_log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
+
me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
}
+
_consumerTag2QueueMap.clear();
}
@@ -414,8 +417,8 @@ public class AMQChannel
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
- ") with a queue(" + queue + ") for " + consumerTag);
+ _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
+ + ") with a queue(" + queue + ") for " + consumerTag);
}
}
}
@@ -458,10 +461,10 @@ public class AMQChannel
if (!(_txnContext instanceof NonTransactionalContext))
{
-// if (_nonTransactedContext == null)
+ // if (_nonTransactedContext == null)
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _nonTransactedContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -472,7 +475,6 @@ public class AMQChannel
}
}
-
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
@@ -488,7 +490,7 @@ public class AMQChannel
// Should we allow access To the DM to directy deliver the message?
// As we don't need to check for Consumers or worry about incrementing the message count?
-// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
+ // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
}
}
@@ -522,10 +524,10 @@ public class AMQChannel
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
-// if (_nonTransactedContext == null)
+ // if (_nonTransactedContext == null)
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _nonTransactedContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -537,51 +539,49 @@ public class AMQChannel
if (unacked.queue != null)
{
- //Redeliver the messages to the front of the queue
+ // Redeliver the messages to the front of the queue
deliveryContext.deliver(unacked.message, unacked.queue, true);
- //Deliver increments the message count but we have already deliverted this once so don't increment it again
+ // Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
- " but no queue defined and no DeadLetter queue so DROPPING message.");
-// _log.error("Requested requeue of message:" + deliveryTag +
-// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
-//
-// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
-//
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
+ + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+ // _log.error("Requested requeue of message:" + deliveryTag +
+ // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
+ //
+ // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
+ //
}
}
else
{
- _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
+ _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+ + _unacknowledgedMessageMap.size());
if (_log.isDebugEnabled())
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- int count = 0;
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
- "[" + message.deliveryTag + "]");
- return false; // Continue
- }
+ int count = 0;
- public void visitComplete()
- {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(
+ (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
- }
- });
+ return false; // Continue
+ }
+
+ public void visitComplete()
+ { }
+ });
}
}
-
}
-
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
@@ -603,54 +603,53 @@ public class AMQChannel
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- AMQShortString consumerTag = message.consumerTag;
- AMQMessage msg = message.message;
- msg.setRedelivered(true);
- if (consumerTag != null)
- {
- // Consumer exists
- if (_consumerTag2QueueMap.containsKey(consumerTag))
- {
- msgToResend.add(message);
- }
- else // consumer has gone
- {
- msgToRequeue.add(message);
- }
- }
- else
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (message.queue != null)
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if (consumerTag != null)
{
- if (requeue)
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
{
- msgToRequeue.add(message);
+ msgToResend.add(message);
}
- else
+ else // consumer has gone
{
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ msgToRequeue.add(message);
}
}
else
{
- _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null)
+ {
+ if (requeue)
+ {
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ }
+ }
+ else
+ {
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ }
}
- }
- // false means continue processing
- return false;
- }
+ // false means continue processing
+ return false;
+ }
- public void visitComplete()
- {
- }
- });
+ public void visitComplete()
+ { }
+ });
// Process Messages to Resend
if (_log.isInfoEnabled())
@@ -664,6 +663,7 @@ public class AMQChannel
_log.info("No message to resend.");
}
}
+
for (UnacknowledgedMessage message : msgToResend)
{
AMQMessage msg = message.message;
@@ -672,22 +672,21 @@ public class AMQChannel
// If the client has requested the messages be resent then it is
// their responsibility to ensure that thay are capable of receiving them
// i.e. The channel hasn't been server side suspended.
-// if (isSuspended())
-// {
-// _log.info("Channel is suspended so requeuing");
-// //move this message to requeue
-// msgToRequeue.add(message);
-// }
-// else
-// {
- //release to allow it to be delivered
+ // if (isSuspended())
+ // {
+ // _log.info("Channel is suspended so requeuing");
+ // //move this message to requeue
+ // msgToRequeue.add(message);
+ // }
+ // else
+ // {
+ // release to allow it to be delivered
msg.release(message.queue);
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
-
Subscription sub = msg.getDeliveredSubscription(message.queue);
if (sub != null)
@@ -704,17 +703,20 @@ public class AMQChannel
{
if (_log.isDebugEnabled())
{
- _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message");
+ _log.debug("Subscription(" + System.identityHashCode(sub)
+ + ") closed during resend so requeuing message");
}
- //move this message to requeue
+ // move this message to requeue
msgToRequeue.add(message);
}
else
{
if (_log.isDebugEnabled())
{
- _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub));
+ _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
+ + System.identityHashCode(sub));
}
+
sub.addToResendQueue(msg);
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
@@ -725,13 +727,14 @@ public class AMQChannel
if (_log.isInfoEnabled())
{
- _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+ + ")to prevent loss");
}
- //move this message to requeue
+ // move this message to requeue
msgToRequeue.add(message);
}
} // for all messages
-// } else !isSuspend
+ // } else !isSuspend
if (_log.isInfoEnabled())
{
@@ -748,8 +751,8 @@ public class AMQChannel
{
if (_nonTransactedContext == null)
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _nonTransactedContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -783,29 +786,29 @@ public class AMQChannel
public void queueDeleted(final AMQQueue queue) throws AMQException
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- if (message.queue == queue)
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- try
- {
- message.discard(_storeContext);
- message.queue = null;
- }
- catch (AMQException e)
+ if (message.queue == queue)
{
- _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
- e, e);
+ try
+ {
+ message.discard(_storeContext);
+ message.queue = null;
+ }
+ catch (AMQException e)
+ {
+ _log.error(
+ "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
+ }
}
+
+ return false;
}
- return false;
- }
- public void visitComplete()
- {
- }
- });
+ public void visitComplete()
+ { }
+ });
}
/**
@@ -834,6 +837,7 @@ public class AMQChannel
}
}
+
checkSuspension();
}
@@ -851,8 +855,9 @@ public class AMQChannel
{
boolean suspend;
- suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+ suspend =
+ ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
+ || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
setSuspended(suspend);
}
@@ -873,7 +878,7 @@ public class AMQChannel
if (wasSuspended)
{
_log.debug("Unsuspending channel " + this);
- //may need to deliver queued messages
+ // may need to deliver queued messages
for (AMQQueue q : _consumerTag2QueueMap.values())
{
q.deliverAsync();
@@ -897,6 +902,7 @@ public class AMQChannel
{
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
+
_txnContext.commit();
}
@@ -911,6 +917,7 @@ public class AMQChannel
sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
sb.append("/").append(_prefetch_HighWaterMark);
+
return sb.toString();
}
@@ -934,14 +941,13 @@ public class AMQChannel
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId,
- bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
}
+
_returnMessages.clear();
}
-
public boolean wouldSuspend(AMQMessage msg)
{
if (isSuspended())
@@ -950,19 +956,20 @@ public class AMQChannel
}
else
{
- boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ boolean willSuspend =
+ ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
- willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
+ willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize));
}
-
if (willSuspend)
{
setSuspended(true);
}
+
return willSuspend;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index ab103fbd2a..a3aa6e7f5d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
throw new NoRouteException(msg, payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 0202ccb762..418cf64c56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
public class DestWildExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
- // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final String TOPIC_SEPARATOR = ".";
private static final String AMQP_STAR = "*";
private static final String AMQP_HASH = "#";
@@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange
queueList.add(q.getName().toString());
}
- Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
+ Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
-
public AMQShortString getType()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange
{
queueList = _routingKey2queues.get(routingKey);
}
+
if (!queueList.contains(queue))
{
queueList.add(queue);
@@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange
for (int index = 0; index < size; index++)
{
- //if there are more levels
- if (index + 1 < size)
+ // if there are more levels
+ if ((index + 1) < size)
{
if (_subscription.get(index).equals(AMQP_HASH))
{
@@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange
// we don't need #.# delete this one
_subscription.remove(index);
size--;
- //redo this normalisation
+ // redo this normalisation
index--;
}
@@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange
_subscription.add(index + 1, _subscription.remove(index));
}
}
- }//if we have more levels
+ } // if we have more levels
}
StringBuilder sb = new StringBuilder();
@@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
- if (queues == null || queues.size() == 0)
+ if ((queues == null) || queues.isEmpty())
{
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
String msg = "Topic " + routingKey + " is not known to " + this;
throw new NoRouteException(msg, payload);
@@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange
{
_logger.warn("No queues found for routing key " + routingKey);
_logger.warn("Routing map contains: " + _routingKey2queues);
+
return;
}
}
@@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && queues.contains(queue);
- }
+ return (queues != null) && queues.contains(queue);
+ }
public boolean isBound(AMQShortString routingKey) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && !queues.isEmpty();
+
+ return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
@@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange
return true;
}
}
+
return false;
}
@@ -275,16 +280,18 @@ public class DestWildExchange extends AbstractExchange
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
if (queues == null)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey + ". No queue was registered with that routing key");
+ throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
+ + " with routing key " + routingKey + ". No queue was registered with that routing key");
}
+
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey);
+ throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
+ + " with routing key " + routingKey);
}
+
if (queues.isEmpty())
{
_routingKey2queues.remove(routingKey);
@@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange
}
}
-
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange
queueList.add(queTok.nextToken());
}
-
int depth = 0;
boolean matching = true;
boolean done = false;
@@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange
while (matching && !done)
{
- if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+ if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
{
done = true;
// if it was the routing key that ran out of digits
- if (routingkeyList.size() == depth + routingskip)
+ if (routingkeyList.size() == (depth + routingskip))
{
if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+ { // a hash and it is the last entry
+ matching =
+ queueList.get(depth + queueskip).equals(AMQP_HASH)
+ && (queueList.size() == (depth + queueskip + 1));
}
}
- else if (routingkeyList.size() > depth + routingskip)
+ else if (routingkeyList.size() > (depth + routingskip))
{
// There is still more routing key to check
matching = false;
}
-
continue;
}
@@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange
else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
{
// Is this a # at the end
- if (queueList.size() == depth + queueskip + 1)
+ if (queueList.size() == (depth + queueskip + 1))
{
done = true;
+
continue;
}
// otherwise # in the middle
- while (routingkeyList.size() > depth + routingskip)
+ while (routingkeyList.size() > (depth + routingskip))
{
if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
{
queueskip++;
depth++;
+
break;
}
+
routingskip++;
}
+
continue;
}
+
matching = false;
}
+
depth++;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index d20e3fa27b..5a6301548b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -1,27 +1,36 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange
private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange
{
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange
}
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange
}
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,11 +144,9 @@ public class FanoutExchange extends AbstractExchange
{
assert queue != null;
-
if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- ". ");
+ throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
}
}
@@ -159,10 +154,10 @@ public class FanoutExchange extends AbstractExchange
{
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload);
}
@@ -193,13 +188,12 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index b4b2bc20bc..b5c03a7291 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getMessagePublishInfo().isMandatory())
+ if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate())
{
throw new NoRouteException(msg, payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index d430f1af94..5bfd47b469 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,24 +20,13 @@
*/
package org.apache.qpid.server.protocol;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.security.Principal;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public class AMQMinaProtocolSession implements AMQProtocolSession,
- Managable
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
-
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
+ throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
-
_codecFactory = codecFactory;
-
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ // throw e;
}
-// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
+ AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
@@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
- public void dataBlockReceived(AMQDataBlock message)
- throws Exception
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
@@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void frameReceived(AMQFrame frame)
- throws AMQException
+ private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
@@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) getProtocolMajorVersion(), // versionMajor
- (short) getProtocolMinorVersion()); // versionMinor
+ AMQFrame response =
+ ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) getProtocolMajorVersion(), // versionMajor
+ (short) getProtocolMinorVersion()); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// TODO: Close connection (but how to wait until message is sent?)
// ritchiem 2006-12-04 will this not do?
-// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
-// future.join();
-// close connection
+ // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+ // future.join();
+ // close connection
}
}
-
private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
- methodBody);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- //Check that this channel is not closing
+ // Check that this channel is not closing
if (channelAwaitingClosure(channelId))
{
if ((evt.getMethod() instanceof ChannelCloseOkBody))
@@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Channel[" + channelId + "] awaiting closure ignoring");
}
+
return;
}
}
-
try
{
try
@@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt) ||
- wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
+
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt);
@@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Closing channel due to: " + e.getMessage());
}
+
writeFrame(e.getCloseFrame(channelId));
closeChannel(channelId);
}
@@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
}
+
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
- AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ AMQConnectionException ce =
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(ce.getCloseFrame(channelId));
@@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));
@@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
listener.error(e);
}
+
_minaProtocolSession.close();
}
}
-
private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
- channel.publishContentHeader(body);
+ channel.publishContentHeader(body, this);
}
@@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
}
+
return channel;
}
public AMQChannel getChannel(int channelId) throws AMQException
{
- final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId)
- ? _cachedChannels[channelId]
- : _channelMap.get(channelId);
- if (channel == null || channel.isClosing())
+ final AMQChannel channel =
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ if ((channel == null) || channel.isClosing())
{
return null;
}
@@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
if (_channelMap.size() == _maxNoOfChannels)
{
- String errorMessage = toString() + ": maximum number of channels has been reached (" +
- _maxNoOfChannels + "); can't create channel";
+ String errorMessage =
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
}
@@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_cachedChannels[channelId] = channel;
}
+
checkForNotification();
}
@@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void commitTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.commit();
}
@@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void rollbackTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.rollback();
}
@@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
channel.close(this);
}
+
_channelMap.clear();
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
@@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
+
for (Task task : _taskList)
{
task.doTask(this);
@@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
+
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
{
_clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
@@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public boolean isProtocolVersion(byte major, byte minor)
{
- return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
+ return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
}
public VersionSpecificRegistry getRegistry()
@@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _registry;
}
-
public Object getClientIdentifier()
{
return _minaProtocolSession.getRemoteAddress();
}
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public String getClientVersion()
{
- return _clientVersion == null ? null : _clientVersion.toString();
+ return (_clientVersion == null) ? null : _clientVersion.toString();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 66a14f3bfb..1e1eaa2813 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -20,16 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.JMException;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -44,6 +34,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.JMException;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
@@ -590,7 +590,7 @@ public class AMQQueue implements Managable, Comparable
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
// fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -606,7 +606,7 @@ public class AMQQueue implements Managable, Comparable
// from the queue:
dequeue(storeContext, msg);
}
- }
+ }*/
// public DeliveryManager getDeliveryManager()
// {
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
index c8a244743e..048fcfb0b3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
@@ -25,6 +25,7 @@ import junit.framework.TestCase;
import org.apache.log4j.NDC;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
@@ -113,70 +114,70 @@ public class ImmediateMessageTest extends TestCase
testClients.testNoExceptions(testProps);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
@@ -205,70 +206,76 @@ public class ImmediateMessageTest extends TestCase
testClients.testNoExceptions(testProps);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
@@ -425,6 +432,14 @@ public class ImmediateMessageTest extends TestCase
}
}
+ public static class MessageMonitor implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message): called");
+ }
+ }
+
/**
* Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
* convenience method for code that does anticipate handling connection failures. All exceptions that indicate
@@ -505,13 +520,14 @@ public class ImmediateMessageTest extends TestCase
int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ boolean durableSubscription = messagingProps.getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME);
// Check if any Qpid/AMQP specific flags or options need to be set.
boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
boolean needsQpidOptions = immediate | mandatory;
- log.debug("ackMode = " + ackMode);
+ /*log.debug("ackMode = " + ackMode);
log.debug("useTopics = " + useTopics);
log.debug("destinationSendRoot = " + destinationSendRoot);
log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
@@ -522,7 +538,7 @@ public class ImmediateMessageTest extends TestCase
log.debug("transactional = " + transactional);
log.debug("immediate = " + immediate);
log.debug("mandatory = " + mandatory);
- log.debug("needsQpidOptions = " + needsQpidOptions);
+ log.debug("needsQpidOptions = " + needsQpidOptions);*/
// Create connection, sessions and producer/consumer pairs on each session.
Connection connection = createConnection(messagingProps);
@@ -535,7 +551,7 @@ public class ImmediateMessageTest extends TestCase
Session receiverSession = connection.createSession(transactional, ackMode);
Destination publisherProducerDestination =
- useTopics ? publisherSession.createTopic(destinationSendRoot)
+ useTopics ? (Destination) publisherSession.createTopic(destinationSendRoot)
: publisherSession.createQueue(destinationSendRoot);
MessageProducer publisherProducer =
@@ -548,13 +564,29 @@ public class ImmediateMessageTest extends TestCase
createPublisherConsumer
? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+ if (publisherConsumer != null)
+ {
+ publisherConsumer.setMessageListener(new MessageMonitor());
+ }
+
MessageProducer receiverProducer =
createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
: null;
+ Destination receiverConsumerDestination =
+ useTopics ? (Destination) receiverSession.createTopic(destinationSendRoot)
+ : receiverSession.createQueue(destinationSendRoot);
+
MessageConsumer receiverConsumer =
- createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
- : null;
+ createReceiverConsumer
+ ? ((durableSubscription && useTopics)
+ ? receiverSession.createDurableSubscriber((Topic) receiverConsumerDestination, "testsub")
+ : receiverSession.createConsumer(receiverConsumerDestination)) : null;
+
+ if (receiverConsumer != null)
+ {
+ receiverConsumer.setMessageListener(new MessageMonitor());
+ }
// Start listening for incoming messages.
connection.start();
@@ -578,7 +610,8 @@ public class ImmediateMessageTest extends TestCase
public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
{
- return client.getSession().createMessage();
+ return client.getSession().createTextMessage("Hello");
+ // return client.getSession().createMessage();
}
/**
@@ -868,9 +901,7 @@ public class ImmediateMessageTest extends TestCase
public static PublisherReceiver connectClients(ParsedProperties testProps)
{
// Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- PublisherReceiver testClients = createPublisherReceiverPairSharedConnection(testProps);
-
- return testClients;
+ return createPublisherReceiverPairSharedConnection(testProps);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
index 8fbda6f54b..09a32aa3eb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
@@ -91,42 +91,6 @@ public class MandatoryMessageTest extends TestCase
testClients.testNoExceptions(testProps);
}
- /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
- {
- // Ensure transactional sessions are off.
- testProps.setProperty(TRANSACTED_PROPNAME, false);
- testProps.setProperty(PUBSUB_PROPNAME, false);
-
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
- ImmediateMessageTest.PublisherReceiver testClients =
- ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
-
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
- }
-
- /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
- {
- // Ensure transactional sessions are on.
- testProps.setProperty(TRANSACTED_PROPNAME, true);
- testProps.setProperty(PUBSUB_PROPNAME, false);
-
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
- ImmediateMessageTest.PublisherReceiver testClients =
- ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
-
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
- }
-
/**
* Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
* the route exists.
@@ -167,68 +131,68 @@ public class MandatoryMessageTest extends TestCase
testClients.testNoExceptions(testProps);
}
- /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
- testProps.setProperty(PUBSUB_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message with no errors.
- testClients.testNoExceptions(testProps);
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
{
- // Ensure transactional sessions are off.
+ // Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
- testProps.setProperty(PUBSUB_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message with no errors.
- testClients.testNoExceptions(testProps);
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
}
- /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
{
- // Ensure transactional sessions are on.
+ // Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
}
/**
@@ -241,6 +205,9 @@ public class MandatoryMessageTest extends TestCase
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
@@ -261,6 +228,9 @@ public class MandatoryMessageTest extends TestCase
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
@@ -271,6 +241,42 @@ public class MandatoryMessageTest extends TestCase
testClients.testNoExceptions(testProps);
}
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
protected void setUp() throws Exception
{
NDC.push(getName());
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
index 9c8cefc492..b584c8c80b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import org.apache.qpid.jms.Session;
@@ -167,6 +187,12 @@ public class MessagingTestConfigProperties
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. */
+ public static final String DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription";
+
+ /** Defines the default value of the durable subscriptions flag. */
+ public static final boolean DURABLE_SUBSCRIPTION_DEFAULT = false;
+
// ====================== Qpid Options and Flags ================================
/** Holds the name of the property to set the exclusive flag from. */
@@ -272,6 +298,7 @@ public class MessagingTestConfigProperties
defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);