diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid')
17 files changed, 290 insertions, 114 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5dd6619cff..1ebe5fa0a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -319,6 +319,25 @@ public class AMQChannel public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug(message); + + return true; + } + + public void visitComplete() + { + } + }); + } + AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) { @@ -342,9 +361,23 @@ public class AMQChannel private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException { - _log.info("Unsubscribing all consumers on channel " + toString()); + if (_log.isInfoEnabled()) + { + if (!_consumerTag2QueueMap.isEmpty()) + { + _log.info("Unsubscribing all consumers on channel " + toString()); + } + else + { + _log.info("No consumers to unsubscribe on channel " + toString()); + } + } for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet()) { + if (_log.isInfoEnabled()) + { + _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); + } me.getValue().unregisterProtocolSession(session, _channelId, me.getKey()); } _consumerTag2QueueMap.clear(); @@ -369,7 +402,11 @@ public class AMQChannel } else { - _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity()); + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + + ") with a queue(" + queue + ") for " + consumerTag); + } } } @@ -395,25 +432,38 @@ public class AMQChannel */ public void requeue() throws AMQException { + if (_log.isInfoEnabled()) + { + _log.info("Requeuing for " + toString()); + } + // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + if (_log.isDebugEnabled()) + { + _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages."); + } // Deliver these messages out of the transaction as their delivery was never // part of the transaction only the receive. - TransactionalContext deliveryContext; - if (!(_txnContext instanceof NonTransactionalContext)) + TransactionalContext deliveryContext = null; + + if (!messagesToBeDelivered.isEmpty()) { - if (_nonTransactedContext == null) + if (!(_txnContext instanceof NonTransactionalContext)) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); - } +// if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } - deliveryContext = _nonTransactedContext; - } - else - { - deliveryContext = _txnContext; + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; + } } @@ -421,6 +471,10 @@ public class AMQChannel { if (unacked.queue != null) { + // Ensure message is released for redelivery + unacked.message.release(); + + // Mark message redelivered unacked.message.setRedelivered(true); // Deliver Message @@ -459,7 +513,7 @@ public class AMQChannel TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - if (_nonTransactedContext == null) +// if (_nonTransactedContext == null) { _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); @@ -472,13 +526,12 @@ public class AMQChannel deliveryContext = _txnContext; } - if (unacked.queue != null) { //Redeliver the messages to the front of the queue deliveryContext.deliver(unacked.message, unacked.queue, true); - - unacked.message.decrementReference(_storeContext); + //Deliver increments the message count but we have already deliverted this once so don't increment it again + // this was because deliver did an increment changed this. } else { @@ -489,7 +542,6 @@ public class AMQChannel // // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); // -// unacked.message.decrementReference(_storeContext); } } else @@ -656,15 +708,16 @@ public class AMQChannel } sub.addToResendQueue(msg); _unacknowledgedMessageMap.remove(message.deliveryTag); - // Don't decrement as we are bypassing the normal deliver which increments - // this is why there is a decrement on the Requeue as deliver will increment. - // msg.decrementReference(_storeContext); } } // sync(sub.getSendLock) } else { - _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); + + if (_log.isInfoEnabled()) + { + _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"); + } //move this message to requeue msgToRequeue.add(message); } @@ -706,7 +759,6 @@ public class AMQChannel deliveryContext.deliver(message.message, message.queue, true); _unacknowledgedMessageMap.remove(message.deliveryTag); - message.message.decrementReference(_storeContext); } } @@ -760,8 +812,18 @@ public class AMQChannel { synchronized (_unacknowledgedMessageMap.getLock()) { + if (_log.isDebugEnabled()) + { + _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size()); + } + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); checkSuspension(); + if (_log.isDebugEnabled()) + { + _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size()); + } + } } @@ -775,12 +837,6 @@ public class AMQChannel return _unacknowledgedMessageMap; } - public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) - { - _browsedAcks.add(deliveryTag); - addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } - private void checkSuspension() { boolean suspend; diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 820f0122f5..fb16267d97 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -37,6 +37,10 @@ public abstract class RequiredDeliveryException extends AMQException { super(message); _amqMessage = payload; + // Increment the reference as this message is in the routing phase + // and so will have the ref decremented as routing fails. + // we need to keep this message around so we can return it in the + // handler. So increment here. payload.incrementReference(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index c987c12154..aac9408247 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -101,6 +101,8 @@ public class TxAck implements TxnOp for (UnacknowledgedMessage msg : _unacked) { msg.restoreTransientMessageData(); + + //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); } } @@ -124,7 +126,7 @@ public class TxAck implements TxnOp _map.remove(_unacked); for (UnacknowledgedMessage msg : _unacked) { - msg.clearTransientMessageData(); + msg.clearTransientMessageData(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 940b5b2bf1..b8c5e821f7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -39,7 +39,6 @@ public class UnacknowledgedMessage this.message = message; this.consumerTag = consumerTag; this.deliveryTag = deliveryTag; - message.incrementReference(); } public String toString() @@ -63,6 +62,7 @@ public class UnacknowledgedMessage { message.dequeue(storeContext, queue); } + //if the queue is null then the message is waiting to be acked, but has been removed. message.decrementReference(storeContext); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 7d18043f5c..8bab96a11b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -29,9 +29,12 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.log4j.Logger; public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> { + private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class); + private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler(); public static BasicCancelMethodHandler getInstance() @@ -55,6 +58,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC throw body.getChannelNotFoundException(evt.getChannelId()); } + if (_log.isDebugEnabled()) + { + _log.debug("BasicCancel: for:" + body.consumerTag + + " nowait:" + body.nowait); + } + channel.unsubscribeConsumer(protocolSession, body.consumerTag); if (!body.nowait) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index da61f2ffd5..56eae279dc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; @@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + if (_log.isDebugEnabled()) + { + _log.debug("BasicConsume: from '" + body.queue + + "' for:" + body.consumerTag + + " nowait:" + body.nowait + + " args:" + body.arguments); + } AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue); if (queue == null) { - _log.info("No queue for '" + body.queue + "'"); + if (_log.isTraceEnabled()) + { + _log.trace("No queue for '" + body.queue + "'"); + } if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; @@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } catch (org.apache.qpid.AMQInvalidArgumentException ise) { - _log.info("Closing connection due to invalid selector"); - throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage()); + _log.debug("Closing connection due to invalid selector"); + // Why doesn't this ChannelException work. +// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId + BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId + AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode + new AMQShortString(ise.getMessage()))); // replyText } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Non-unique consumer tag, '" + body.consumerTag + "'"); + // If the above doesn't work then perhaps this is wrong too. +// throw body.getConnectionException(AMQConstant.NOT_ALLOWED, +// "Non-unique consumer tag, '" + body.consumerTag + "'"); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } catch (AMQQueue.ExistingExclusiveSubscription e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 4e77a5e8b9..14687c40ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -52,13 +52,13 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR int channelId = evt.getChannelId(); - if (_logger.isTraceEnabled()) - { - _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + - ": Requeue:" + evt.getMethod().requeue + -// ": Resend:" + evt.getMethod().resend + - " on channel:" + channelId); - } +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Rejecting:" + evt.getMethod().deliveryTag + +// ": Requeue:" + evt.getMethod().requeue + +//// ": Resend:" + evt.getMethod().resend + +// " on channel:" + channelId); +// } AMQChannel channel = session.getChannel(channelId); @@ -67,9 +67,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR throw evt.getMethod().getChannelNotFoundException(channelId); } - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + _logger.debug("Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue + // ": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 777784ca30..1f4f1f9221 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -51,8 +51,11 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { AMQProtocolSession session = stateManager.getProtocolSession(); ChannelCloseBody body = evt.getMethod(); - _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + - " and method " + body.methodId); + if (_logger.isInfoEnabled()) + { + _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + + " and method " + body.methodId); + } int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 21da03d226..b086cad67f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> +public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> { private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); @@ -49,8 +49,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionCloseBody body = evt.getMethod(); - _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + session); + if (_logger.isInfoEnabled()) + { + _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + + body.replyText + " for " + session); + } try { session.closeSession(); @@ -62,7 +65,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 03c7051aac..d8b7814d31 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -176,6 +176,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } else { + _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); + // Be aware of possible changes to parameter order as versions change. protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(), @@ -185,7 +187,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter 200, // replyCode new AMQShortString(throwable.getMessage()) // replyText )); - _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 6d375c89fe..cdf316f2d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -45,9 +45,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -/** - * Combines the information that make up a deliverable message into a more manageable form. - */ +/** Combines the information that make up a deliverable message into a more manageable form. */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -92,9 +90,10 @@ public class AMQMessage return _taken.get(); } + private final int hashcode = System.identityHashCode(this); public String debugIdentity() { - return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")"; + return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } /** @@ -206,7 +205,7 @@ public class AMQMessage _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { - _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId); + _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")"); } } @@ -363,7 +362,7 @@ public class AMQMessage if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); + _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } } @@ -374,6 +373,7 @@ public class AMQMessage * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed + * @param storeContext */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { @@ -387,9 +387,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); - - + _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } // must check if the handle is null since there may be cases where we decide to throw away a message @@ -410,7 +408,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); + _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5)); if (_referenceCount.get() < 0) { Thread.dumpStack(); @@ -418,7 +416,7 @@ public class AMQMessage } if (_referenceCount.get() < 0) { - throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0."); + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0."); } } } @@ -459,7 +457,10 @@ public class AMQMessage public void release() { - _log.trace("Releasing Message:" + debugIdentity()); + if (_log.isTraceEnabled()) + { + _log.trace("Releasing Message:" + debugIdentity()); + } _taken.set(false); _takenBySubcription = null; } @@ -572,7 +573,7 @@ public class AMQMessage List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); if (_log.isDebugEnabled()) { - _log.debug("Delivering message " + _messageId + " to " + destinationQueues); + _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues); } try { @@ -589,6 +590,8 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { + //Increment the references to this message for each queue delivery. + incrementReference(); //normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } @@ -596,6 +599,7 @@ public class AMQMessage finally { destinationQueues.clear(); + // Remove refence for routing process . Reference count should now == delivered queue count decrementReference(storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 7c2fe73386..78f144703b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -516,7 +516,7 @@ public class AMQQueue implements Managable, Comparable { if (_logger.isInfoEnabled()) { - _logger.warn("Auto-deleteing queue:" + this); + _logger.info("Auto-deleteing queue:" + this); } autodelete(); // we need to manually fire the event to the removed subscription (which was the last one left for this @@ -624,7 +624,6 @@ public class AMQQueue implements Managable, Comparable try { msg.dequeue(storeContext, this); - msg.decrementReference(storeContext); } catch (MessageCleanupException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 87868f0b25..6122d191f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -383,6 +383,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return count; } + /** + This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. + */ private AMQMessage getNextMessage() throws AMQException { return getNextMessage(_messages, null); @@ -392,13 +395,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub))) + //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.) + while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub)) { //remove the already taken message AMQMessage removed = messages.poll(); assert removed == message; - + _totalMessageSize.addAndGet(-message.getSize()); if (_log.isTraceEnabled()) @@ -494,7 +498,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _extraMessages.decrementAndGet(); } - else if (messageQueue == sub.getPreDeliveryQueue()) + else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser()) { if (_log.isInfoEnabled()) { @@ -695,7 +699,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + + _log.debug(debugIdentity() + " Message(" + msg.toString() + ") has been taken so disregarding deliver request to Subscriber:" + System.identityHashCode(s)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 20033daac7..d3578d39e8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -256,10 +256,10 @@ public class SubscriptionImpl implements Subscription // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client // received the message. If it is lost in transit that is not important. - if (_acks) - { - channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); - } +// if (_acks) +// { +// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); +// } if (_sendLock.get()) { @@ -273,41 +273,49 @@ public class SubscriptionImpl implements Subscription private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) throws AMQException { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + try + { // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) - { - if (_logger.isDebugEnabled()) + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + if (_logger.isDebugEnabled()) + { + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + } + queue.dequeue(storeContext, msg); } - queue.dequeue(storeContext, msg); - } - synchronized (channel) - { - long deliveryTag = channel.getNextDeliveryTag(); - if (_sendLock.get()) + synchronized (channel) { - _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); - } + long deliveryTag = channel.getNextDeliveryTag(); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - msg.decrementReference(storeContext); - } + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + + } + } + finally + { //Only set delivered if it actually was writen successfully.. - // using a try->finally would set it even if an error occured. + // using a try->finally would set it even if an error occured. + // Is this what we want? + msg.setDeliveredToConsumer(); } } @@ -461,14 +469,25 @@ public class SubscriptionImpl implements Subscription public void close() { + boolean closed = false; synchronized (_sendLock) { if (_logger.isDebugEnabled()) { - _logger.debug("Setting SendLock true"); + _logger.debug("Setting SendLock true:" + debugIdentity()); + } + + closed = _sendLock.getAndSet(true); + } + + if (closed) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Called close() on a closed subscription"); } - _sendLock.set(true); + return; } if (_logger.isInfoEnabled()) @@ -488,16 +507,36 @@ public class SubscriptionImpl implements Subscription //remove references in PDQ if (_messages != null) { + if (_logger.isInfoEnabled()) + { + _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this); + } + _messages.clear(); } + } + + private void autoclose() + { + close(); if (_autoClose && !_sentClose) { - _logger.info("Closing autoclose subscription:" + this); + _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this); + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); - _sentClose = true; + + //fixme JIRA do this better + try + { + channel.unsubscribeConsumer(protocolSession, consumerTag); + } + catch (AMQException e) + { + // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag. + } } } @@ -590,7 +629,7 @@ public class SubscriptionImpl implements Subscription { if (_messages.isEmpty()) { - close(); + autoclose(); return null; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index e5cce672f6..cf0da55f2a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -100,7 +100,7 @@ public class LocalTransactionalContext implements TransactionalContext // be added for every queue onto which the message is // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. - message.incrementReference(); +// message.incrementReference(); _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 19146da22e..181dfa3a80 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -93,7 +93,6 @@ public class NonTransactionalContext implements TransactionalContext { try { - message.incrementReference(); queue.process(_storeContext, message, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: @@ -128,6 +127,8 @@ public class NonTransactionalContext implements TransactionalContext { _log.debug("Discarding message: " + message.message.getMessageId()); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); } else @@ -160,6 +161,8 @@ public class NonTransactionalContext implements TransactionalContext { _log.debug("Discarding message: " + msg.message.getMessageId()); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(_storeContext); } else @@ -181,7 +184,22 @@ public class NonTransactionalContext implements TransactionalContext throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); } - msg.discard(_storeContext); + + if (!_browsedAcks.contains(deliveryTag)) + { + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + msg.message.getMessageId()); + } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); + } + else + { + _browsedAcks.remove(deliveryTag); + } + if (_log.isDebugEnabled()) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java index d04b93a469..339ca8ae1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java @@ -27,10 +27,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; -/** - * Holds a list of TxnOp instance representing transactional - * operations. - */ +/** Holds a list of TxnOp instance representing transactional operations. */ public class TxnBuffer { private final List<TxnOp> _ops = new ArrayList<TxnOp>(); @@ -42,6 +39,11 @@ public class TxnBuffer public void commit(StoreContext context) throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray()); + } + if (prepare(context)) { for (TxnOp op : _ops) @@ -64,7 +66,7 @@ public class TxnBuffer catch (Exception e) { //compensate previously prepared ops - for(int j = 0; j < i; j++) + for (int j = 0; j < i; j++) { _ops.get(j).undoPrepare(); } |