summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-11 16:01:10 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-11 16:01:10 +0000
commiteb12fe81f29d3d50598eafd01a4eda1fad6275cb (patch)
tree8fd481dc85827e6868024ff7132b5e10b4ac6bd1 /java/broker/src
parent4a5cad728b45af81f64d21901968380142d7d111 (diff)
downloadqpid-python-eb12fe81f29d3d50598eafd01a4eda1fad6275cb.tar.gz
Copy over QPID-926
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655330 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java30
1 files changed, 24 insertions, 6 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 38cf14d772..18f1836185 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -110,6 +110,9 @@ public class NonTransactionalContext implements TransactionalContext
boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
throws AMQException
{
+
+ final boolean debug = _log.isDebugEnabled();
+ ;
if (multiple)
{
if (deliveryTag == 0)
@@ -123,11 +126,14 @@ public class NonTransactionalContext implements TransactionalContext
{
public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Discarding message: " + message.getMessage().getMessageId());
}
-
+ if(message.getMessage().isPersistent())
+ {
+ beginTranIfNecessary();
+ }
message.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
@@ -152,11 +158,14 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (QueueEntry msg : acked)
{
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
-
+ if(msg.getMessage().isPersistent())
+ {
+ beginTranIfNecessary();
+ }
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
@@ -176,20 +185,29 @@ public class NonTransactionalContext implements TransactionalContext
_channel.getChannelId());
}
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
+ if(msg.getMessage().isPersistent())
+ {
+ beginTranIfNecessary();
+ }
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
msg.getMessage().getMessageId());
}
}
+ if(_inTran)
+ {
+ _messageStore.commitTran(_storeContext);
+ _inTran = false;
+ }
}
public void messageFullyReceived(boolean persistent) throws AMQException