diff options
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 35da132833..262bb2f226 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.txn.*; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -192,18 +193,18 @@ public class AMQChannel routeCurrentMessage(); - _transaction.addPostCommitAction(new ServerTransaction.Action() - { + _transaction.addPostCommitAction(new ServerTransaction.Action() + { - public void postCommit() - { - } + public void postCommit() + { + } - public void onRollback() - { - handle.remove(); - } - }); + public void onRollback() + { + handle.remove(); + } + }); deliverCurrentMessageIfComplete(); @@ -984,11 +985,13 @@ public class AMQChannel { final boolean immediate = _incommingMessage.isImmediate(); - + final AMQMessage amqMessage = createAMQMessage(_incommingMessage); + MessageReference ref = amqMessage.newReference(); for(AMQQueue queue : _destinationQueues) { - QueueEntry entry = queue.enqueue(createAMQMessage(_incommingMessage)); + + QueueEntry entry = queue.enqueue(amqMessage); queue.checkCapacity(AMQChannel.this); @@ -1035,6 +1038,7 @@ public class AMQChannel } } + ref.release(); } catch (AMQException e) { |