summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
commitc4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch)
tree421d85cf41bf382bab618587298d9d6bef825bdc /java/broker/src/main/java/org/apache
parent685ad5615e73f02f76f69841162fb9aa126892d2 (diff)
downloadqpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-403 Implement Basic.Reject QPID-410 Queue Browsers should use not acknowledge messages. ------------------------------------- Broker TxAck - Added comment and fixed white space UnacknowledgedMessage - Added comment for messageDecrement AMQChannel - Added extra debugging. + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction. + Updated message reference counting. So it is in terms of queues don't increment when giving to client. BasicCancelMethodHandler - Added Debug log. BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging. BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging AMQPFastProtocolHandler - moved error log to before session.write AMQMessage - Added additional debug via debugIdentity() and comments AMQQueue - Decoupled reference counting from dequeue operation. ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer. + On Close ensured that it is only called once. + Had problem where closing browser was causing two CancelOk frames to be sent back to client. RequiredDeliveryException - Added comment to explain incrementReference LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here. NonTransactionalContext - Removed incrementReference on deliver + - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up. TxnBuffer - Added debug logging. Client ------ AMQQueueBrowser - Added comments AMQSession - Added comments and debug + Updated to cause closed consumer to reject messages rather than receive them. + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback BasicMessageConsumer - Added trace level debuging on close calls + Forced noConsume-rs to use NO_ACK + added more logging Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first. ChannelCloseOkMethodHandler - updated comment AMQProtocolSession - Update comments,whitespace TransportConnection - removed static block FlowControllingBlockingQueue - Added isEmpty() Method PropertyValueTest - Added VM Broker setup + Updated test to run once and 50 times to pull out delivery tag problems that were occuring. + Adjusted logging level to be more helpful. moved some info down to trace and debug. MessageRequeueTest - Moved QpidClientConnection its own file. + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1. ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator(). Added QueueBrowserTest to system tests to test QueueBrowsering. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java114
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java12
17 files changed, 290 insertions, 114 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5dd6619cff..1ebe5fa0a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -319,6 +319,25 @@ public class AMQChannel
public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(message);
+
+ return true;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+ }
+
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
@@ -342,9 +361,23 @@ public class AMQChannel
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
{
- _log.info("Unsubscribing all consumers on channel " + toString());
+ if (_log.isInfoEnabled())
+ {
+ if (!_consumerTag2QueueMap.isEmpty())
+ {
+ _log.info("Unsubscribing all consumers on channel " + toString());
+ }
+ else
+ {
+ _log.info("No consumers to unsubscribe on channel " + toString());
+ }
+ }
for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+ }
me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
}
_consumerTag2QueueMap.clear();
@@ -369,7 +402,11 @@ public class AMQChannel
}
else
{
- _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
+ ") with a queue(" + queue + ") for " + consumerTag);
+ }
}
}
@@ -395,25 +432,38 @@ public class AMQChannel
*/
public void requeue() throws AMQException
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Requeuing for " + toString());
+ }
+
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ if (_log.isDebugEnabled())
+ {
+ _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages.");
+ }
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
- TransactionalContext deliveryContext;
- if (!(_txnContext instanceof NonTransactionalContext))
+ TransactionalContext deliveryContext = null;
+
+ if (!messagesToBeDelivered.isEmpty())
{
- if (_nonTransactedContext == null)
+ if (!(_txnContext instanceof NonTransactionalContext))
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
- }
+// if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
- deliveryContext = _nonTransactedContext;
- }
- else
- {
- deliveryContext = _txnContext;
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
+ }
}
@@ -421,6 +471,10 @@ public class AMQChannel
{
if (unacked.queue != null)
{
+ // Ensure message is released for redelivery
+ unacked.message.release();
+
+ // Mark message redelivered
unacked.message.setRedelivered(true);
// Deliver Message
@@ -459,7 +513,7 @@ public class AMQChannel
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- if (_nonTransactedContext == null)
+// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
_returnMessages, _browsedAcks);
@@ -472,13 +526,12 @@ public class AMQChannel
deliveryContext = _txnContext;
}
-
if (unacked.queue != null)
{
//Redeliver the messages to the front of the queue
deliveryContext.deliver(unacked.message, unacked.queue, true);
-
- unacked.message.decrementReference(_storeContext);
+ //Deliver increments the message count but we have already deliverted this once so don't increment it again
+ // this was because deliver did an increment changed this.
}
else
{
@@ -489,7 +542,6 @@ public class AMQChannel
//
// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
//
-// unacked.message.decrementReference(_storeContext);
}
}
else
@@ -656,15 +708,16 @@ public class AMQChannel
}
sub.addToResendQueue(msg);
_unacknowledgedMessageMap.remove(message.deliveryTag);
- // Don't decrement as we are bypassing the normal deliver which increments
- // this is why there is a decrement on the Requeue as deliver will increment.
- // msg.decrementReference(_storeContext);
}
} // sync(sub.getSendLock)
}
else
{
- _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+ }
//move this message to requeue
msgToRequeue.add(message);
}
@@ -706,7 +759,6 @@ public class AMQChannel
deliveryContext.deliver(message.message, message.queue, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
- message.message.decrementReference(_storeContext);
}
}
@@ -760,8 +812,18 @@ public class AMQChannel
{
synchronized (_unacknowledgedMessageMap.getLock())
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
+ }
+
_unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
checkSuspension();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
+ }
+
}
}
@@ -775,12 +837,6 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
- public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
- {
- _browsedAcks.add(deliveryTag);
- addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
-
private void checkSuspension()
{
boolean suspend;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 820f0122f5..fb16267d97 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -37,6 +37,10 @@ public abstract class RequiredDeliveryException extends AMQException
{
super(message);
_amqMessage = payload;
+ // Increment the reference as this message is in the routing phase
+ // and so will have the ref decremented as routing fails.
+ // we need to keep this message around so we can return it in the
+ // handler. So increment here.
payload.incrementReference();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index c987c12154..aac9408247 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -101,6 +101,8 @@ public class TxAck implements TxnOp
for (UnacknowledgedMessage msg : _unacked)
{
msg.restoreTransientMessageData();
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
}
}
@@ -124,7 +126,7 @@ public class TxAck implements TxnOp
_map.remove(_unacked);
for (UnacknowledgedMessage msg : _unacked)
{
- msg.clearTransientMessageData();
+ msg.clearTransientMessageData();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 940b5b2bf1..b8c5e821f7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -39,7 +39,6 @@ public class UnacknowledgedMessage
this.message = message;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
- message.incrementReference();
}
public String toString()
@@ -63,6 +62,7 @@ public class UnacknowledgedMessage
{
message.dequeue(storeContext, queue);
}
+ //if the queue is null then the message is waiting to be acked, but has been removed.
message.decrementReference(storeContext);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 7d18043f5c..8bab96a11b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -29,9 +29,12 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
{
+ private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class);
+
private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
public static BasicCancelMethodHandler getInstance()
@@ -55,6 +58,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
throw body.getChannelNotFoundException(evt.getChannelId());
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicCancel: for:" + body.consumerTag +
+ " nowait:" + body.nowait);
+ }
+
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
if (!body.nowait)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index da61f2ffd5..56eae279dc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicConsume: from '" + body.queue +
+ "' for:" + body.consumerTag +
+ " nowait:" + body.nowait +
+ " args:" + body.arguments);
+ }
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No queue for '" + body.queue + "'");
+ }
if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
@@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
- _log.info("Closing connection due to invalid selector");
- throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ _log.debug("Closing connection due to invalid selector");
+ // Why doesn't this ChannelException work.
+// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+ BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+ AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode
+ new AMQShortString(ise.getMessage()))); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // If the above doesn't work then perhaps this is wrong too.
+// throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+// "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 4e77a5e8b9..14687c40ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -52,13 +52,13 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
int channelId = evt.getChannelId();
- if (_logger.isTraceEnabled())
- {
- _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
- ": Requeue:" + evt.getMethod().requeue +
-// ": Resend:" + evt.getMethod().resend +
- " on channel:" + channelId);
- }
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
+// ": Requeue:" + evt.getMethod().requeue +
+//// ": Resend:" + evt.getMethod().resend +
+// " on channel:" + channelId);
+// }
AMQChannel channel = session.getChannel(channelId);
@@ -67,9 +67,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
throw evt.getMethod().getChannelNotFoundException(channelId);
}
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
": Requeue:" + evt.getMethod().requeue +
// ": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 777784ca30..1f4f1f9221 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -51,8 +51,11 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
AMQProtocolSession session = stateManager.getProtocolSession();
ChannelCloseBody body = evt.getMethod();
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
+ " and method " + body.methodId);
+ }
int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 21da03d226..b086cad67f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
@@ -49,8 +49,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + session);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
+ body.replyText + " for " + session);
+ }
try
{
session.closeSession();
@@ -62,7 +65,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 03c7051aac..d8b7814d31 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -176,6 +176,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
else
{
+ _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
+
// Be aware of possible changes to parameter order as versions change.
protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
session.getProtocolMajorVersion(),
@@ -185,7 +187,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
200, // replyCode
new AMQShortString(throwable.getMessage()) // replyText
));
- _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 6d375c89fe..cdf316f2d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -45,9 +45,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -92,9 +90,10 @@ public class AMQMessage
return _taken.get();
}
+ private final int hashcode = System.identityHashCode(this);
public String debugIdentity()
{
- return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+ return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
/**
@@ -206,7 +205,7 @@ public class AMQMessage
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
}
}
@@ -363,7 +362,7 @@ public class AMQMessage
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+ _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
}
@@ -374,6 +373,7 @@ public class AMQMessage
*
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
+ * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -387,9 +387,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
-
-
+ _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
// must check if the handle is null since there may be cases where we decide to throw away a message
@@ -410,7 +408,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+ _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
@@ -418,7 +416,7 @@ public class AMQMessage
}
if (_referenceCount.get() < 0)
{
- throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
}
}
}
@@ -459,7 +457,10 @@ public class AMQMessage
public void release()
{
- _log.trace("Releasing Message:" + debugIdentity());
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Releasing Message:" + debugIdentity());
+ }
_taken.set(false);
_takenBySubcription = null;
}
@@ -572,7 +573,7 @@ public class AMQMessage
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
if (_log.isDebugEnabled())
{
- _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
+ _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
}
try
{
@@ -589,6 +590,8 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
+ //Increment the references to this message for each queue delivery.
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -596,6 +599,7 @@ public class AMQMessage
finally
{
destinationQueues.clear();
+ // Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 7c2fe73386..78f144703b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -516,7 +516,7 @@ public class AMQQueue implements Managable, Comparable
{
if (_logger.isInfoEnabled())
{
- _logger.warn("Auto-deleteing queue:" + this);
+ _logger.info("Auto-deleteing queue:" + this);
}
autodelete();
// we need to manually fire the event to the removed subscription (which was the last one left for this
@@ -624,7 +624,6 @@ public class AMQQueue implements Managable, Comparable
try
{
msg.dequeue(storeContext, this);
- msg.decrementReference(storeContext);
}
catch (MessageCleanupException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 87868f0b25..6122d191f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -383,6 +383,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
+ /**
+ This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -392,13 +395,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
- while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
+ //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
+ while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
assert removed == message;
-
+
_totalMessageSize.addAndGet(-message.getSize());
if (_log.isTraceEnabled())
@@ -494,7 +498,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_extraMessages.decrementAndGet();
}
- else if (messageQueue == sub.getPreDeliveryQueue())
+ else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser())
{
if (_log.isInfoEnabled())
{
@@ -695,7 +699,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+ _log.debug(debugIdentity() + " Message(" + msg.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
System.identityHashCode(s));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 20033daac7..d3578d39e8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -256,10 +256,10 @@ public class SubscriptionImpl implements Subscription
// We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
// received the message. If it is lost in transit that is not important.
- if (_acks)
- {
- channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
- }
+// if (_acks)
+// {
+// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+// }
if (_sendLock.get())
{
@@ -273,41 +273,49 @@ public class SubscriptionImpl implements Subscription
private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
throws AMQException
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ try
+ { // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- if (_logger.isDebugEnabled())
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
{
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ queue.dequeue(storeContext, msg);
}
- queue.dequeue(storeContext, msg);
- }
- synchronized (channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
+ synchronized (channel)
{
- _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
- }
+ long deliveryTag = channel.getNextDeliveryTag();
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- msg.decrementReference(storeContext);
- }
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+
+ }
+ }
+ finally
+ {
//Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
+ // using a try->finally would set it even if an error occured.
+ // Is this what we want?
+
msg.setDeliveredToConsumer();
}
}
@@ -461,14 +469,25 @@ public class SubscriptionImpl implements Subscription
public void close()
{
+ boolean closed = false;
synchronized (_sendLock)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Setting SendLock true");
+ _logger.debug("Setting SendLock true:" + debugIdentity());
+ }
+
+ closed = _sendLock.getAndSet(true);
+ }
+
+ if (closed)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Called close() on a closed subscription");
}
- _sendLock.set(true);
+ return;
}
if (_logger.isInfoEnabled())
@@ -488,16 +507,36 @@ public class SubscriptionImpl implements Subscription
//remove references in PDQ
if (_messages != null)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this);
+ }
+
_messages.clear();
}
+ }
+
+ private void autoclose()
+ {
+ close();
if (_autoClose && !_sentClose)
{
- _logger.info("Closing autoclose subscription:" + this);
+ _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
+
ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
-
_sentClose = true;
+
+ //fixme JIRA do this better
+ try
+ {
+ channel.unsubscribeConsumer(protocolSession, consumerTag);
+ }
+ catch (AMQException e)
+ {
+ // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+ }
}
}
@@ -590,7 +629,7 @@ public class SubscriptionImpl implements Subscription
{
if (_messages.isEmpty())
{
- close();
+ autoclose();
return null;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index e5cce672f6..cf0da55f2a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -100,7 +100,7 @@ public class LocalTransactionalContext implements TransactionalContext
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
- message.incrementReference();
+// message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 19146da22e..181dfa3a80 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -93,7 +93,6 @@ public class NonTransactionalContext implements TransactionalContext
{
try
{
- message.incrementReference();
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
@@ -128,6 +127,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.debug("Discarding message: " + message.message.getMessageId());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
}
else
@@ -160,6 +161,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.debug("Discarding message: " + msg.message.getMessageId());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
}
else
@@ -181,7 +184,22 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
}
- msg.discard(_storeContext);
+
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ msg.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index d04b93a469..339ca8ae1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -27,10 +27,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
-/**
- * Holds a list of TxnOp instance representing transactional
- * operations.
- */
+/** Holds a list of TxnOp instance representing transactional operations. */
public class TxnBuffer
{
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
@@ -42,6 +39,11 @@ public class TxnBuffer
public void commit(StoreContext context) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray());
+ }
+
if (prepare(context))
{
for (TxnOp op : _ops)
@@ -64,7 +66,7 @@ public class TxnBuffer
catch (Exception e)
{
//compensate previously prepared ops
- for(int j = 0; j < i; j++)
+ for (int j = 0; j < i; j++)
{
_ops.get(j).undoPrepare();
}