summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java62
1 files changed, 31 insertions, 31 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1724b0e4ec..588b2079f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -295,26 +296,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_currentMessage.setExpiration();
+ _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
- _currentMessage.setStoredMessage(handle);
-
- routeCurrentMessage();
-
-
- _transaction.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- }
-
- public void onRollback()
- {
- handle.remove();
- }
- });
+ _currentMessage.route();
deliverCurrentMessageIfComplete();
}
@@ -346,17 +330,41 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
_actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
}
-
}
else
{
+ final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData());
+ _currentMessage.setStoredMessage(handle);
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
+ {
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
+ {
+ ContentChunk contentChunk = _currentMessage.getContentChunk(i);
+ handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
+ bodyLengthReceived += contentChunk.getSize();
+ }
+ }
+
+ _transaction.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ }
+
+ public void onRollback()
+ {
+ handle.remove();
+ }
+ });
+
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ updateTransactionalActivity();
+ _currentMessage.getStoredMessage().flushToStore();
}
}
- _currentMessage.getStoredMessage().flushToStore();
-
}
finally
{
@@ -383,9 +391,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
try
{
-
- // returns true iff the message was delivered (i.e. if all data was
- // received
final ContentChunk contentChunk =
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
@@ -409,11 +414,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- protected void routeCurrentMessage() throws AMQException
- {
- _currentMessage.route();
- }
-
public long getNextDeliveryTag()
{
return ++_deliveryTag;