diff options
author | Rupert Smith <rupertlssmith@apache.org> | 2007-06-26 16:43:58 +0000 |
---|---|---|
committer | Rupert Smith <rupertlssmith@apache.org> | 2007-06-26 16:43:58 +0000 |
commit | d2a9e42e20edbfd0db53c75a4f0511547ec70319 (patch) | |
tree | 7656b45795eb1aa79d36596090e64bb092d7a354 | |
parent | 37354109282201cc38b2734ab3b33fb5eeafca5c (diff) | |
download | qpid-python-d2a9e42e20edbfd0db53c75a4f0511547ec70319.tar.gz |
QPID-509 Mandatory messages not returned outside a transaction. They are now.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@550849 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 514 insertions, 438 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 52e9505a32..77546d3134 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,17 +20,8 @@ */ package org.apache.qpid.server; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -53,6 +44,16 @@ import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel { public static final int DEFAULT_PREFETCH = 5000; @@ -61,7 +62,7 @@ public class AMQChannel private final int _channelId; - //private boolean _transactional; + // private boolean _transactional; private long _prefetch_HighWaterMark; @@ -113,13 +114,12 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); - //Why do we need this reference ? - ritchiem + // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; private boolean _closing; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { _session = session; _channelId = channelId; @@ -166,7 +166,6 @@ public class AMQChannel return _prefetchSize; } - public void setPrefetchSize(long prefetchSize) { _prefetchSize = prefetchSize; @@ -192,18 +191,15 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException { - - _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, - _txnContext); + _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); _currentMessage.setPublisher(publisher); } - public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException + public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) + throws AMQException { if (_currentMessage == null) { @@ -215,6 +211,7 @@ public class AMQChannel { _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } + _currentMessage.setContentHeaderBody(contentHeaderBody); _currentMessage.setExpiration(); @@ -224,13 +221,13 @@ public class AMQChannel // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) { + _txnContext.messageProcessed(protocolSession); _currentMessage = null; } } } - public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) - throws AMQException + public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException { if (_currentMessage == null) { @@ -241,12 +238,15 @@ public class AMQChannel { _log.trace(debugIdentity() + "Content body received on channel " + _channelId); } + try { // returns true iff the message was delivered (i.e. if all data was // received - if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody))) + if (_currentMessage.addContentBodyFrame(_storeContext, + protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk( + contentBody))) { // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case @@ -303,12 +303,13 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { tag = new AMQShortString("sgen_" + getNextConsumerTag()); } + if (_consumerTag2QueueMap.containsKey(tag)) { throw new ConsumerTagNotUniqueException(); @@ -316,29 +317,28 @@ public class AMQChannel queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); + return tag; } - public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException { if (_log.isDebugEnabled()) { _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - - public boolean callback(UnacknowledgedMessage message) throws AMQException { - _log.debug(message); - return true; - } + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug(message); - public void visitComplete() - { - } - }); + return true; + } + + public void visitComplete() + { } + }); } AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); @@ -382,14 +382,17 @@ public class AMQChannel _log.info("No consumers to unsubscribe on channel " + toString()); } } + for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet()) { if (_log.isInfoEnabled()) { _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } + me.getValue().unregisterProtocolSession(session, _channelId, me.getKey()); } + _consumerTag2QueueMap.clear(); } @@ -414,8 +417,8 @@ public class AMQChannel { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + - ") with a queue(" + queue + ") for " + consumerTag); + _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + + ") with a queue(" + queue + ") for " + consumerTag); } } } @@ -458,10 +461,10 @@ public class AMQChannel if (!(_txnContext instanceof NonTransactionalContext)) { -// if (_nonTransactedContext == null) + // if (_nonTransactedContext == null) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _nonTransactedContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -472,7 +475,6 @@ public class AMQChannel } } - for (UnacknowledgedMessage unacked : messagesToBeDelivered) { if (unacked.queue != null) @@ -488,7 +490,7 @@ public class AMQChannel // Should we allow access To the DM to directy deliver the message? // As we don't need to check for Consumers or worry about incrementing the message count? -// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); + // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } } @@ -522,10 +524,10 @@ public class AMQChannel TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { -// if (_nonTransactedContext == null) + // if (_nonTransactedContext == null) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _nonTransactedContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -537,51 +539,49 @@ public class AMQChannel if (unacked.queue != null) { - //Redeliver the messages to the front of the queue + // Redeliver the messages to the front of the queue deliveryContext.deliver(unacked.message, unacked.queue, true); - //Deliver increments the message count but we have already deliverted this once so don't increment it again + // Deliver increments the message count but we have already deliverted this once so don't increment it again // this was because deliver did an increment changed this. } else { - _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag + - " but no queue defined and no DeadLetter queue so DROPPING message."); -// _log.error("Requested requeue of message:" + deliveryTag + -// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); -// -// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); -// + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + // _log.error("Requested requeue of message:" + deliveryTag + + // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); + // + // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); + // } } else { - _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); + _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + + _unacknowledgedMessageMap.size()); if (_log.isDebugEnabled()) { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - int count = 0; - - public boolean callback(UnacknowledgedMessage message) throws AMQException { - _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" + - "[" + message.deliveryTag + "]"); - return false; // Continue - } + int count = 0; - public void visitComplete() - { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug( + (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]"); - } - }); + return false; // Continue + } + + public void visitComplete() + { } + }); } } - } - /** * Called to resend all outstanding unacknowledged messages to this same channel. * @@ -603,54 +603,53 @@ public class AMQChannel // Marking messages who still have a consumer for to be resent // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - public boolean callback(UnacknowledgedMessage message) throws AMQException { - AMQShortString consumerTag = message.consumerTag; - AMQMessage msg = message.message; - msg.setRedelivered(true); - if (consumerTag != null) - { - // Consumer exists - if (_consumerTag2QueueMap.containsKey(consumerTag)) - { - msgToResend.add(message); - } - else // consumer has gone - { - msgToRequeue.add(message); - } - } - else + public boolean callback(UnacknowledgedMessage message) throws AMQException { - // Message has no consumer tag, so was "delivered" to a GET - // or consumer no longer registered - // cannot resend, so re-queue. - if (message.queue != null) + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if (consumerTag != null) { - if (requeue) + // Consumer exists + if (_consumerTag2QueueMap.containsKey(consumerTag)) { - msgToRequeue.add(message); + msgToResend.add(message); } - else + else // consumer has gone { - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + msgToRequeue.add(message); } } else { - _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null) + { + if (requeue) + { + msgToRequeue.add(message); + } + else + { + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + } + } + else + { + _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + } } - } - // false means continue processing - return false; - } + // false means continue processing + return false; + } - public void visitComplete() - { - } - }); + public void visitComplete() + { } + }); // Process Messages to Resend if (_log.isInfoEnabled()) @@ -664,6 +663,7 @@ public class AMQChannel _log.info("No message to resend."); } } + for (UnacknowledgedMessage message : msgToResend) { AMQMessage msg = message.message; @@ -672,22 +672,21 @@ public class AMQChannel // If the client has requested the messages be resent then it is // their responsibility to ensure that thay are capable of receiving them // i.e. The channel hasn't been server side suspended. -// if (isSuspended()) -// { -// _log.info("Channel is suspended so requeuing"); -// //move this message to requeue -// msgToRequeue.add(message); -// } -// else -// { - //release to allow it to be delivered + // if (isSuspended()) + // { + // _log.info("Channel is suspended so requeuing"); + // //move this message to requeue + // msgToRequeue.add(message); + // } + // else + // { + // release to allow it to be delivered msg.release(message.queue); // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(message.queue); if (sub != null) @@ -704,17 +703,20 @@ public class AMQChannel { if (_log.isDebugEnabled()) { - _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message"); + _log.debug("Subscription(" + System.identityHashCode(sub) + + ") closed during resend so requeuing message"); } - //move this message to requeue + // move this message to requeue msgToRequeue.add(message); } else { if (_log.isDebugEnabled()) { - _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub)); + _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + + System.identityHashCode(sub)); } + sub.addToResendQueue(msg); _unacknowledgedMessageMap.remove(message.deliveryTag); } @@ -725,13 +727,14 @@ public class AMQChannel if (_log.isInfoEnabled()) { - _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"); + _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + + ")to prevent loss"); } - //move this message to requeue + // move this message to requeue msgToRequeue.add(message); } } // for all messages -// } else !isSuspend + // } else !isSuspend if (_log.isInfoEnabled()) { @@ -748,8 +751,8 @@ public class AMQChannel { if (_nonTransactedContext == null) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _nonTransactedContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -783,29 +786,29 @@ public class AMQChannel public void queueDeleted(final AMQQueue queue) throws AMQException { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - public boolean callback(UnacknowledgedMessage message) throws AMQException { - if (message.queue == queue) + public boolean callback(UnacknowledgedMessage message) throws AMQException { - try - { - message.discard(_storeContext); - message.queue = null; - } - catch (AMQException e) + if (message.queue == queue) { - _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " + - e, e); + try + { + message.discard(_storeContext); + message.queue = null; + } + catch (AMQException e) + { + _log.error( + "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); + } } + + return false; } - return false; - } - public void visitComplete() - { - } - }); + public void visitComplete() + { } + }); } /** @@ -834,6 +837,7 @@ public class AMQChannel } } + checkSuspension(); } @@ -851,8 +855,9 @@ public class AMQChannel { boolean suspend; - suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) - || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); + suspend = + ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) + || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); setSuspended(suspend); } @@ -873,7 +878,7 @@ public class AMQChannel if (wasSuspended) { _log.debug("Unsuspending channel " + this); - //may need to deliver queued messages + // may need to deliver queued messages for (AMQQueue q : _consumerTag2QueueMap.values()) { q.deliverAsync(); @@ -897,6 +902,7 @@ public class AMQChannel { throw new AMQException("Fatal error: commit called on non-transactional channel"); } + _txnContext.commit(); } @@ -911,6 +917,7 @@ public class AMQChannel sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional()); sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark); sb.append("/").append(_prefetch_HighWaterMark); + return sb.toString(); } @@ -934,14 +941,13 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, - bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); } + _returnMessages.clear(); } - public boolean wouldSuspend(AMQMessage msg) { if (isSuspended()) @@ -950,19 +956,20 @@ public class AMQChannel } else { - boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); + boolean willSuspend = + ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); if (!willSuspend) { final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); - willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize); + willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize)); } - if (willSuspend) { setSuspended(true); } + return willSuspend; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index ab103fbd2a..a3aa6e7f5d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange if (queues == null || queues.isEmpty()) { String msg = "Routing key " + routingKey + " is not known to " + this; - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { throw new NoRouteException(msg, payload); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 0202ccb762..418cf64c56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.server.exchange; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.LinkedList; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException; import javax.management.MBeanException; @@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; public class DestWildExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); - // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); + private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private static final String TOPIC_SEPARATOR = "."; private static final String AMQP_STAR = "*"; private static final String AMQP_HASH = "#"; @@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange queueList.add(q.getName().toString()); } - Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; + Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) }; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public AMQShortString getType() { return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange { queueList = _routingKey2queues.get(routingKey); } + if (!queueList.contains(queue)) { queueList.add(queue); @@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange for (int index = 0; index < size; index++) { - //if there are more levels - if (index + 1 < size) + // if there are more levels + if ((index + 1) < size) { if (_subscription.get(index).equals(AMQP_HASH)) { @@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange // we don't need #.# delete this one _subscription.remove(index); size--; - //redo this normalisation + // redo this normalisation index--; } @@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange _subscription.add(index + 1, _subscription.remove(index)); } } - }//if we have more levels + } // if we have more levels } StringBuilder sb = new StringBuilder(); @@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag - if (queues == null || queues.size() == 0) + if ((queues == null) || queues.isEmpty()) { - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { String msg = "Topic " + routingKey + " is not known to " + this; throw new NoRouteException(msg, payload); @@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange { _logger.warn("No queues found for routing key " + routingKey); _logger.warn("Routing map contains: " + _routingKey2queues); + return; } } @@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && queues.contains(queue); - } + return (queues != null) && queues.contains(queue); + } public boolean isBound(AMQShortString routingKey) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && !queues.isEmpty(); + + return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) throws AMQException @@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange return true; } } + return false; } @@ -275,16 +280,18 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = _routingKey2queues.get(routingKey); if (queues == null) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey + ". No queue was registered with that routing key"); + throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey + ". No queue was registered with that routing key"); } + boolean removedQ = queues.remove(queue); if (!removedQ) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey); + throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey); } + if (queues.isEmpty()) { _routingKey2queues.remove(routingKey); @@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange } } - private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { List<AMQQueue> list = new LinkedList<AMQQueue>(); @@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange queueList.add(queTok.nextToken()); } - int depth = 0; boolean matching = true; boolean done = false; @@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange while (matching && !done) { - if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip) + if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) { done = true; // if it was the routing key that ran out of digits - if (routingkeyList.size() == depth + routingskip) + if (routingkeyList.size() == (depth + routingskip)) { if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1; + { // a hash and it is the last entry + matching = + queueList.get(depth + queueskip).equals(AMQP_HASH) + && (queueList.size() == (depth + queueskip + 1)); } } - else if (routingkeyList.size() > depth + routingskip) + else if (routingkeyList.size() > (depth + routingskip)) { // There is still more routing key to check matching = false; } - continue; } @@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) { // Is this a # at the end - if (queueList.size() == depth + queueskip + 1) + if (queueList.size() == (depth + queueskip + 1)) { done = true; + continue; } // otherwise # in the middle - while (routingkeyList.size() > depth + routingskip) + while (routingkeyList.size() > (depth + routingskip)) { if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) { queueskip++; depth++; + break; } + routingskip++; } + continue; } + matching = false; } + depth++; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index d20e3fa27b..5a6301548b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -1,27 +1,36 @@ /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange {
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange }
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange }
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,11 +144,9 @@ public class FanoutExchange extends AbstractExchange {
assert queue != null;
-
if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- ". ");
+ throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
}
}
@@ -159,10 +154,10 @@ public class FanoutExchange extends AbstractExchange {
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload);
}
@@ -193,13 +188,12 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index b4b2bc20bc..b5c03a7291 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange String msg = "Exchange " + getName() + ": message not routable."; - if (payload.getMessagePublishInfo().isMandatory()) + if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate()) { throw new NoRouteException(msg, payload); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d430f1af94..5bfd47b469 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,13 @@ */ package org.apache.qpid.server.protocol; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.security.Principal; - -import javax.management.JMException; -import javax.security.sasl.SaslServer; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public class AMQMinaProtocolSession implements AMQProtocolSession, - Managable +import javax.management.JMException; +import javax.security.sasl.SaslServer; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; - public ManagedObject getManagedObject() { return _managedObject; } - - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _codecFactory = codecFactory; - try { IoServiceConfig config = session.getServiceConfig(); @@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (RuntimeException e) { e.printStackTrace(); - // throw e; + // throw e; } -// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return (AMQProtocolSession) minaProtocolSession.getAttachment(); } - public void dataBlockReceived(AMQDataBlock message) - throws Exception + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; if (message instanceof ProtocolInitiation) @@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void frameReceived(AMQFrame frame) - throws AMQException + private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) getProtocolMajorVersion(), // versionMajor - (short) getProtocolMinorVersion()); // versionMinor + AMQFrame response = + ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) getProtocolMajorVersion(), // versionMajor + (short) getProtocolMinorVersion()); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // TODO: Close connection (but how to wait until message is sent?) // ritchiem 2006-12-04 will this not do? -// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); -// future.join(); -// close connection + // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); + // future.join(); + // close connection } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, - methodBody); + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - //Check that this channel is not closing + // Check that this channel is not closing if (channelAwaitingClosure(channelId)) { if ((evt.getMethod() instanceof ChannelCloseOkBody)) @@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); } + return; } } - try { try @@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt) || - wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } + if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt); @@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing channel due to: " + e.getMessage()); } + writeFrame(e.getCloseFrame(channelId)); closeChannel(channelId); } @@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); } + if (_logger.isInfoEnabled()) { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); - AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + AMQConnectionException ce = + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(ce.getCloseFrame(channelId)); @@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(channelId)); @@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { listener.error(e); } + _minaProtocolSession.close(); } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); - channel.publishContentHeader(body); + channel.publishContentHeader(body, this); } @@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); } + return channel; } public AMQChannel getChannel(int channelId) throws AMQException { - final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) - ? _cachedChannels[channelId] - : _channelMap.get(channelId); - if (channel == null || channel.isClosing()) + final AMQChannel channel = + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + if ((channel == null) || channel.isClosing()) { return null; } @@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, if (_channelMap.size() == _maxNoOfChannels) { - String errorMessage = toString() + ": maximum number of channels has been reached (" + - _maxNoOfChannels + "); can't create channel"; + String errorMessage = + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); } @@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _cachedChannels[channelId] = channel; } + checkForNotification(); } @@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void commitTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.commit(); } @@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void rollbackTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.rollback(); } @@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { channel.close(this); } + _channelMap.clear(); for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { @@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for (Task task : _taskList) { task.doTask(this); @@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); } + if (_clientProperties.getString(ClientProperties.version.toString()) != null) { _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); @@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public boolean isProtocolVersion(byte major, byte minor) { - return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor; + return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); } public VersionSpecificRegistry getRegistry() @@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _registry; } - public Object getClientIdentifier() { return _minaProtocolSession.getRemoteAddress(); } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public String getClientVersion() { - return _clientVersion == null ? null : _clientVersion.toString(); + return (_clientVersion == null) ? null : _clientVersion.toString(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 66a14f3bfb..1e1eaa2813 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -20,16 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.text.MessageFormat; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.management.JMException; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -44,6 +34,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.management.JMException; + +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. @@ -590,7 +590,7 @@ public class AMQQueue implements Managable, Comparable delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -606,7 +606,7 @@ public class AMQQueue implements Managable, Comparable // from the queue: dequeue(storeContext, msg); } - } + }*/ // public DeliveryManager getDeliveryManager() // { diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java index c8a244743e..048fcfb0b3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java @@ -25,6 +25,7 @@ import junit.framework.TestCase; import org.apache.log4j.NDC;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
@@ -113,70 +114,70 @@ public class ImmediateMessageTest extends TestCase testClients.testNoExceptions(testProps);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
@@ -205,70 +206,76 @@ public class ImmediateMessageTest extends TestCase testClients.testNoExceptions(testProps);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
// Send one message and get a linked no consumers exception.
testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
- /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
- public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- // Disconnect the consumer.
- testClients.getReceiver().getConsumer().close();
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
@@ -425,6 +432,14 @@ public class ImmediateMessageTest extends TestCase }
}
+ public static class MessageMonitor implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message): called");
+ }
+ }
+
/**
* Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
* convenience method for code that does anticipate handling connection failures. All exceptions that indicate
@@ -505,13 +520,14 @@ public class ImmediateMessageTest extends TestCase int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ boolean durableSubscription = messagingProps.getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME);
// Check if any Qpid/AMQP specific flags or options need to be set.
boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
boolean needsQpidOptions = immediate | mandatory;
- log.debug("ackMode = " + ackMode);
+ /*log.debug("ackMode = " + ackMode);
log.debug("useTopics = " + useTopics);
log.debug("destinationSendRoot = " + destinationSendRoot);
log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
@@ -522,7 +538,7 @@ public class ImmediateMessageTest extends TestCase log.debug("transactional = " + transactional);
log.debug("immediate = " + immediate);
log.debug("mandatory = " + mandatory);
- log.debug("needsQpidOptions = " + needsQpidOptions);
+ log.debug("needsQpidOptions = " + needsQpidOptions);*/
// Create connection, sessions and producer/consumer pairs on each session.
Connection connection = createConnection(messagingProps);
@@ -535,7 +551,7 @@ public class ImmediateMessageTest extends TestCase Session receiverSession = connection.createSession(transactional, ackMode);
Destination publisherProducerDestination =
- useTopics ? publisherSession.createTopic(destinationSendRoot)
+ useTopics ? (Destination) publisherSession.createTopic(destinationSendRoot)
: publisherSession.createQueue(destinationSendRoot);
MessageProducer publisherProducer =
@@ -548,13 +564,29 @@ public class ImmediateMessageTest extends TestCase createPublisherConsumer
? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+ if (publisherConsumer != null)
+ {
+ publisherConsumer.setMessageListener(new MessageMonitor());
+ }
+
MessageProducer receiverProducer =
createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
: null;
+ Destination receiverConsumerDestination =
+ useTopics ? (Destination) receiverSession.createTopic(destinationSendRoot)
+ : receiverSession.createQueue(destinationSendRoot);
+
MessageConsumer receiverConsumer =
- createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
- : null;
+ createReceiverConsumer
+ ? ((durableSubscription && useTopics)
+ ? receiverSession.createDurableSubscriber((Topic) receiverConsumerDestination, "testsub")
+ : receiverSession.createConsumer(receiverConsumerDestination)) : null;
+
+ if (receiverConsumer != null)
+ {
+ receiverConsumer.setMessageListener(new MessageMonitor());
+ }
// Start listening for incoming messages.
connection.start();
@@ -578,7 +610,8 @@ public class ImmediateMessageTest extends TestCase public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
{
- return client.getSession().createMessage();
+ return client.getSession().createTextMessage("Hello");
+ // return client.getSession().createMessage();
}
/**
@@ -868,9 +901,7 @@ public class ImmediateMessageTest extends TestCase public static PublisherReceiver connectClients(ParsedProperties testProps)
{
// Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- PublisherReceiver testClients = createPublisherReceiverPairSharedConnection(testProps);
-
- return testClients;
+ return createPublisherReceiverPairSharedConnection(testProps);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java index 8fbda6f54b..09a32aa3eb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java @@ -91,42 +91,6 @@ public class MandatoryMessageTest extends TestCase testClients.testNoExceptions(testProps);
}
- /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
- {
- // Ensure transactional sessions are off.
- testProps.setProperty(TRANSACTED_PROPNAME, false);
- testProps.setProperty(PUBSUB_PROPNAME, false);
-
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
- ImmediateMessageTest.PublisherReceiver testClients =
- ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
-
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
- }
-
- /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
- {
- // Ensure transactional sessions are on.
- testProps.setProperty(TRANSACTED_PROPNAME, true);
- testProps.setProperty(PUBSUB_PROPNAME, false);
-
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
- ImmediateMessageTest.PublisherReceiver testClients =
- ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
-
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
- }
-
/**
* Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
* the route exists.
@@ -167,68 +131,68 @@ public class MandatoryMessageTest extends TestCase testClients.testNoExceptions(testProps);
}
- /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
- testProps.setProperty(PUBSUB_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message with no errors.
- testClients.testNoExceptions(testProps);
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
{
- // Ensure transactional sessions are off.
+ // Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
- testProps.setProperty(PUBSUB_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message with no errors.
- testClients.testNoExceptions(testProps);
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
- /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
}
- /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
{
- // Ensure transactional sessions are on.
+ // Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
- // collect its messages).
- testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
-
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
- // Send one message and get a linked no consumers exception.
- testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
}
/**
@@ -241,6 +205,9 @@ public class MandatoryMessageTest extends TestCase testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
@@ -261,6 +228,9 @@ public class MandatoryMessageTest extends TestCase testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
ImmediateMessageTest.PublisherReceiver testClients =
ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
@@ -271,6 +241,42 @@ public class MandatoryMessageTest extends TestCase testClients.testNoExceptions(testProps);
}
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
protected void setUp() throws Exception
{
NDC.push(getName());
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java index 9c8cefc492..b584c8c80b 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java @@ -1,3 +1,23 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import org.apache.qpid.jms.Session;
@@ -167,6 +187,12 @@ public class MessagingTestConfigProperties /** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. */
+ public static final String DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription";
+
+ /** Defines the default value of the durable subscriptions flag. */
+ public static final boolean DURABLE_SUBSCRIPTION_DEFAULT = false;
+
// ====================== Qpid Options and Flags ================================
/** Holds the name of the property to set the exclusive flag from. */
@@ -272,6 +298,7 @@ public class MessagingTestConfigProperties defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);
|