summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java28
1 files changed, 16 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 35da132833..262bb2f226 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.txn.*;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -192,18 +193,18 @@ public class AMQChannel
routeCurrentMessage();
- _transaction.addPostCommitAction(new ServerTransaction.Action()
- {
+ _transaction.addPostCommitAction(new ServerTransaction.Action()
+ {
- public void postCommit()
- {
- }
+ public void postCommit()
+ {
+ }
- public void onRollback()
- {
- handle.remove();
- }
- });
+ public void onRollback()
+ {
+ handle.remove();
+ }
+ });
deliverCurrentMessageIfComplete();
@@ -984,11 +985,13 @@ public class AMQChannel
{
final boolean immediate = _incommingMessage.isImmediate();
-
+ final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
+ MessageReference ref = amqMessage.newReference();
for(AMQQueue queue : _destinationQueues)
{
- QueueEntry entry = queue.enqueue(createAMQMessage(_incommingMessage));
+
+ QueueEntry entry = queue.enqueue(amqMessage);
queue.checkCapacity(AMQChannel.this);
@@ -1035,6 +1038,7 @@ public class AMQChannel
}
}
+ ref.release();
}
catch (AMQException e)
{