summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java114
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java12
17 files changed, 290 insertions, 114 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5dd6619cff..1ebe5fa0a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -319,6 +319,25 @@ public class AMQChannel
public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(message);
+
+ return true;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+ }
+
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
@@ -342,9 +361,23 @@ public class AMQChannel
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
{
- _log.info("Unsubscribing all consumers on channel " + toString());
+ if (_log.isInfoEnabled())
+ {
+ if (!_consumerTag2QueueMap.isEmpty())
+ {
+ _log.info("Unsubscribing all consumers on channel " + toString());
+ }
+ else
+ {
+ _log.info("No consumers to unsubscribe on channel " + toString());
+ }
+ }
for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+ }
me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
}
_consumerTag2QueueMap.clear();
@@ -369,7 +402,11 @@ public class AMQChannel
}
else
{
- _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
+ ") with a queue(" + queue + ") for " + consumerTag);
+ }
}
}
@@ -395,25 +432,38 @@ public class AMQChannel
*/
public void requeue() throws AMQException
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Requeuing for " + toString());
+ }
+
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ if (_log.isDebugEnabled())
+ {
+ _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages.");
+ }
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
- TransactionalContext deliveryContext;
- if (!(_txnContext instanceof NonTransactionalContext))
+ TransactionalContext deliveryContext = null;
+
+ if (!messagesToBeDelivered.isEmpty())
{
- if (_nonTransactedContext == null)
+ if (!(_txnContext instanceof NonTransactionalContext))
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
- }
+// if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
- deliveryContext = _nonTransactedContext;
- }
- else
- {
- deliveryContext = _txnContext;
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
+ }
}
@@ -421,6 +471,10 @@ public class AMQChannel
{
if (unacked.queue != null)
{
+ // Ensure message is released for redelivery
+ unacked.message.release();
+
+ // Mark message redelivered
unacked.message.setRedelivered(true);
// Deliver Message
@@ -459,7 +513,7 @@ public class AMQChannel
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- if (_nonTransactedContext == null)
+// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
_returnMessages, _browsedAcks);
@@ -472,13 +526,12 @@ public class AMQChannel
deliveryContext = _txnContext;
}
-
if (unacked.queue != null)
{
//Redeliver the messages to the front of the queue
deliveryContext.deliver(unacked.message, unacked.queue, true);
-
- unacked.message.decrementReference(_storeContext);
+ //Deliver increments the message count but we have already deliverted this once so don't increment it again
+ // this was because deliver did an increment changed this.
}
else
{
@@ -489,7 +542,6 @@ public class AMQChannel
//
// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
//
-// unacked.message.decrementReference(_storeContext);
}
}
else
@@ -656,15 +708,16 @@ public class AMQChannel
}
sub.addToResendQueue(msg);
_unacknowledgedMessageMap.remove(message.deliveryTag);
- // Don't decrement as we are bypassing the normal deliver which increments
- // this is why there is a decrement on the Requeue as deliver will increment.
- // msg.decrementReference(_storeContext);
}
} // sync(sub.getSendLock)
}
else
{
- _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+ }
//move this message to requeue
msgToRequeue.add(message);
}
@@ -706,7 +759,6 @@ public class AMQChannel
deliveryContext.deliver(message.message, message.queue, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
- message.message.decrementReference(_storeContext);
}
}
@@ -760,8 +812,18 @@ public class AMQChannel
{
synchronized (_unacknowledgedMessageMap.getLock())
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
+ }
+
_unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
checkSuspension();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
+ }
+
}
}
@@ -775,12 +837,6 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
- public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
- {
- _browsedAcks.add(deliveryTag);
- addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
-
private void checkSuspension()
{
boolean suspend;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 820f0122f5..fb16267d97 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -37,6 +37,10 @@ public abstract class RequiredDeliveryException extends AMQException
{
super(message);
_amqMessage = payload;
+ // Increment the reference as this message is in the routing phase
+ // and so will have the ref decremented as routing fails.
+ // we need to keep this message around so we can return it in the
+ // handler. So increment here.
payload.incrementReference();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index c987c12154..aac9408247 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -101,6 +101,8 @@ public class TxAck implements TxnOp
for (UnacknowledgedMessage msg : _unacked)
{
msg.restoreTransientMessageData();
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
}
}
@@ -124,7 +126,7 @@ public class TxAck implements TxnOp
_map.remove(_unacked);
for (UnacknowledgedMessage msg : _unacked)
{
- msg.clearTransientMessageData();
+ msg.clearTransientMessageData();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 940b5b2bf1..b8c5e821f7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -39,7 +39,6 @@ public class UnacknowledgedMessage
this.message = message;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
- message.incrementReference();
}
public String toString()
@@ -63,6 +62,7 @@ public class UnacknowledgedMessage
{
message.dequeue(storeContext, queue);
}
+ //if the queue is null then the message is waiting to be acked, but has been removed.
message.decrementReference(storeContext);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 7d18043f5c..8bab96a11b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -29,9 +29,12 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
{
+ private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class);
+
private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
public static BasicCancelMethodHandler getInstance()
@@ -55,6 +58,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
throw body.getChannelNotFoundException(evt.getChannelId());
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicCancel: for:" + body.consumerTag +
+ " nowait:" + body.nowait);
+ }
+
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
if (!body.nowait)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index da61f2ffd5..56eae279dc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicConsume: from '" + body.queue +
+ "' for:" + body.consumerTag +
+ " nowait:" + body.nowait +
+ " args:" + body.arguments);
+ }
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No queue for '" + body.queue + "'");
+ }
if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
@@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
- _log.info("Closing connection due to invalid selector");
- throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ _log.debug("Closing connection due to invalid selector");
+ // Why doesn't this ChannelException work.
+// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+ BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+ AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode
+ new AMQShortString(ise.getMessage()))); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // If the above doesn't work then perhaps this is wrong too.
+// throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+// "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 4e77a5e8b9..14687c40ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -52,13 +52,13 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
int channelId = evt.getChannelId();
- if (_logger.isTraceEnabled())
- {
- _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
- ": Requeue:" + evt.getMethod().requeue +
-// ": Resend:" + evt.getMethod().resend +
- " on channel:" + channelId);
- }
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
+// ": Requeue:" + evt.getMethod().requeue +
+//// ": Resend:" + evt.getMethod().resend +
+// " on channel:" + channelId);
+// }
AMQChannel channel = session.getChannel(channelId);
@@ -67,9 +67,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
throw evt.getMethod().getChannelNotFoundException(channelId);
}
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
": Requeue:" + evt.getMethod().requeue +
// ": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 777784ca30..1f4f1f9221 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -51,8 +51,11 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
AMQProtocolSession session = stateManager.getProtocolSession();
ChannelCloseBody body = evt.getMethod();
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
+ " and method " + body.methodId);
+ }
int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 21da03d226..b086cad67f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
@@ -49,8 +49,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + session);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
+ body.replyText + " for " + session);
+ }
try
{
session.closeSession();
@@ -62,7 +65,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 03c7051aac..d8b7814d31 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -176,6 +176,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
else
{
+ _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
+
// Be aware of possible changes to parameter order as versions change.
protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
session.getProtocolMajorVersion(),
@@ -185,7 +187,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
200, // replyCode
new AMQShortString(throwable.getMessage()) // replyText
));
- _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 6d375c89fe..cdf316f2d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -45,9 +45,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -92,9 +90,10 @@ public class AMQMessage
return _taken.get();
}
+ private final int hashcode = System.identityHashCode(this);
public String debugIdentity()
{
- return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+ return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
/**
@@ -206,7 +205,7 @@ public class AMQMessage
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
}
}
@@ -363,7 +362,7 @@ public class AMQMessage
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+ _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
}
@@ -374,6 +373,7 @@ public class AMQMessage
*
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
+ * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -387,9 +387,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
-
-
+ _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
// must check if the handle is null since there may be cases where we decide to throw away a message
@@ -410,7 +408,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+ _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
@@ -418,7 +416,7 @@ public class AMQMessage
}
if (_referenceCount.get() < 0)
{
- throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
}
}
}
@@ -459,7 +457,10 @@ public class AMQMessage
public void release()
{
- _log.trace("Releasing Message:" + debugIdentity());
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Releasing Message:" + debugIdentity());
+ }
_taken.set(false);
_takenBySubcription = null;
}
@@ -572,7 +573,7 @@ public class AMQMessage
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
if (_log.isDebugEnabled())
{
- _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
+ _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
}
try
{
@@ -589,6 +590,8 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
+ //Increment the references to this message for each queue delivery.
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -596,6 +599,7 @@ public class AMQMessage
finally
{
destinationQueues.clear();
+ // Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 7c2fe73386..78f144703b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -516,7 +516,7 @@ public class AMQQueue implements Managable, Comparable
{
if (_logger.isInfoEnabled())
{
- _logger.warn("Auto-deleteing queue:" + this);
+ _logger.info("Auto-deleteing queue:" + this);
}
autodelete();
// we need to manually fire the event to the removed subscription (which was the last one left for this
@@ -624,7 +624,6 @@ public class AMQQueue implements Managable, Comparable
try
{
msg.dequeue(storeContext, this);
- msg.decrementReference(storeContext);
}
catch (MessageCleanupException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 87868f0b25..6122d191f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -383,6 +383,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
+ /**
+ This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -392,13 +395,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
- while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
+ //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
+ while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
assert removed == message;
-
+
_totalMessageSize.addAndGet(-message.getSize());
if (_log.isTraceEnabled())
@@ -494,7 +498,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_extraMessages.decrementAndGet();
}
- else if (messageQueue == sub.getPreDeliveryQueue())
+ else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser())
{
if (_log.isInfoEnabled())
{
@@ -695,7 +699,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+ _log.debug(debugIdentity() + " Message(" + msg.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
System.identityHashCode(s));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 20033daac7..d3578d39e8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -256,10 +256,10 @@ public class SubscriptionImpl implements Subscription
// We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
// received the message. If it is lost in transit that is not important.
- if (_acks)
- {
- channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
- }
+// if (_acks)
+// {
+// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+// }
if (_sendLock.get())
{
@@ -273,41 +273,49 @@ public class SubscriptionImpl implements Subscription
private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
throws AMQException
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ try
+ { // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- if (_logger.isDebugEnabled())
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
{
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ queue.dequeue(storeContext, msg);
}
- queue.dequeue(storeContext, msg);
- }
- synchronized (channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
+ synchronized (channel)
{
- _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
- }
+ long deliveryTag = channel.getNextDeliveryTag();
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- msg.decrementReference(storeContext);
- }
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+
+ }
+ }
+ finally
+ {
//Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
+ // using a try->finally would set it even if an error occured.
+ // Is this what we want?
+
msg.setDeliveredToConsumer();
}
}
@@ -461,14 +469,25 @@ public class SubscriptionImpl implements Subscription
public void close()
{
+ boolean closed = false;
synchronized (_sendLock)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Setting SendLock true");
+ _logger.debug("Setting SendLock true:" + debugIdentity());
+ }
+
+ closed = _sendLock.getAndSet(true);
+ }
+
+ if (closed)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Called close() on a closed subscription");
}
- _sendLock.set(true);
+ return;
}
if (_logger.isInfoEnabled())
@@ -488,16 +507,36 @@ public class SubscriptionImpl implements Subscription
//remove references in PDQ
if (_messages != null)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this);
+ }
+
_messages.clear();
}
+ }
+
+ private void autoclose()
+ {
+ close();
if (_autoClose && !_sentClose)
{
- _logger.info("Closing autoclose subscription:" + this);
+ _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
+
ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
-
_sentClose = true;
+
+ //fixme JIRA do this better
+ try
+ {
+ channel.unsubscribeConsumer(protocolSession, consumerTag);
+ }
+ catch (AMQException e)
+ {
+ // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+ }
}
}
@@ -590,7 +629,7 @@ public class SubscriptionImpl implements Subscription
{
if (_messages.isEmpty())
{
- close();
+ autoclose();
return null;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index e5cce672f6..cf0da55f2a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -100,7 +100,7 @@ public class LocalTransactionalContext implements TransactionalContext
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
- message.incrementReference();
+// message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 19146da22e..181dfa3a80 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -93,7 +93,6 @@ public class NonTransactionalContext implements TransactionalContext
{
try
{
- message.incrementReference();
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
@@ -128,6 +127,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.debug("Discarding message: " + message.message.getMessageId());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
}
else
@@ -160,6 +161,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.debug("Discarding message: " + msg.message.getMessageId());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
}
else
@@ -181,7 +184,22 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
}
- msg.discard(_storeContext);
+
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ msg.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index d04b93a469..339ca8ae1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -27,10 +27,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
-/**
- * Holds a list of TxnOp instance representing transactional
- * operations.
- */
+/** Holds a list of TxnOp instance representing transactional operations. */
public class TxnBuffer
{
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
@@ -42,6 +39,11 @@ public class TxnBuffer
public void commit(StoreContext context) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray());
+ }
+
if (prepare(context))
{
for (TxnOp op : _ops)
@@ -64,7 +66,7 @@ public class TxnBuffer
catch (Exception e)
{
//compensate previously prepared ops
- for(int j = 0; j < i; j++)
+ for (int j = 0; j < i; j++)
{
_ops.get(j).undoPrepare();
}