diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java | 56 |
1 files changed, 39 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 68b429efc6..81c8c04d6d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -32,10 +32,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; import org.apache.log4j.Logger; -import java.util.ArrayList; import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> @@ -198,19 +196,53 @@ public class IncomingMessage implements Filterable<RuntimeException> } else { + int offset; + final int queueCount = destinationQueues.size(); + if(queueCount == 1) + { + offset = 0; + } + else + { + offset = ((int)(message.getMessageId().longValue())) % queueCount; + if(offset < 0) + { + offset = -offset; + } + } + + int i = 0; for (AMQQueue q : destinationQueues) { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); + if(++i > offset) + { + // Increment the references to this message for each queue delivery. + message.incrementReference(); + // normal deliver so add this message at the end. + _txnContext.deliver(q, message); + } } + i = 0; + if(offset != 0) + { + for (AMQQueue q : destinationQueues) + { + if(i++ < offset) + { + // Increment the references to this message for each queue delivery. + message.incrementReference(); + // normal deliver so add this message at the end. + _txnContext.deliver(q, message); + } + } + } + } // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - + message.clearStoreContext(); return message; } finally @@ -257,16 +289,6 @@ public class IncomingMessage implements Filterable<RuntimeException> return _messagePublishInfo.isImmediate(); } - - public void enqueue(final AMQQueue q) throws AMQException - { - if(_destinationQueues == null) - { - _destinationQueues = new ArrayList<AMQQueue>(); - } - _destinationQueues.add(q); - } - public ContentHeaderBody getContentHeaderBody() { return _contentHeaderBody; |