summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-16 09:40:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-16 09:40:42 +0000
commit56e72efee5eeefa3d73df1f9fbd77058d017a8ee (patch)
tree00533c79bc0c30acc602f500888dd320a419556e
parentf86d63f08e2eb56c7adc68b837cf3cd7ac4bdac5 (diff)
downloadqpid-python-56e72efee5eeefa3d73df1f9fbd77058d017a8ee.tar.gz
QPID-926 : Perform all store operations associated with an acknowledge in a single store transaction
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648648 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java73
1 files changed, 39 insertions, 34 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 1e4b69c935..cac3489f4c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -49,8 +49,6 @@ public class NonTransactionalContext implements TransactionalContext
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
- private final Set<Long> _browsedAcks;
-
private final MessageStore _messageStore;
private final StoreContext _storeContext;
@@ -61,11 +59,17 @@ public class NonTransactionalContext implements TransactionalContext
public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
{
+ this(messageStore,storeContext,channel,returnMessages);
+ }
+
+ public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
+ List<RequiredDeliveryException> returnMessages)
+ {
_channel = channel;
_storeContext = storeContext;
_returnMessages = returnMessages;
_messageStore = messageStore;
- _browsedAcks = browsedAcks;
+
}
@@ -112,6 +116,9 @@ public class NonTransactionalContext implements TransactionalContext
boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
throws AMQException
{
+
+ final boolean debug = _log.isDebugEnabled();
+
if (multiple)
{
if (deliveryTag == 0)
@@ -125,20 +132,17 @@ public class NonTransactionalContext implements TransactionalContext
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- if (!_browsedAcks.contains(deliveryTag))
+ if (debug)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- message.discard(_storeContext);
+ _log.debug("Discarding message: " + message.getMessage().getMessageId());
}
- else
+ if(message.getMessage().isPersistent())
{
- _browsedAcks.remove(deliveryTag);
+ beginTranIfNecessary();
}
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ message.discard(_storeContext);
+
return false;
}
@@ -159,20 +163,17 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (UnacknowledgedMessage msg : acked)
{
- if (!_browsedAcks.contains(deliveryTag))
+ if (debug)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
- else
+ if(msg.getMessage().isPersistent())
{
- _browsedAcks.remove(deliveryTag);
+ beginTranIfNecessary();
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ msg.discard(_storeContext);
}
}
}
@@ -189,27 +190,31 @@ public class NonTransactionalContext implements TransactionalContext
_channel.getChannelId());
}
- if (!_browsedAcks.contains(deliveryTag))
+ if (debug)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
- else
+ if(msg.getMessage().isPersistent())
{
- _browsedAcks.remove(deliveryTag);
+ beginTranIfNecessary();
}
- if (_log.isDebugEnabled())
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ msg.discard(_storeContext);
+
+ if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
msg.getMessage().getMessageId());
}
}
+
+ if(_inTran)
+ {
+ _messageStore.commitTran(_storeContext);
+ _inTran = false;
+ }
+
}
public void messageFullyReceived(boolean persistent) throws AMQException