summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java56
1 files changed, 39 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 68b429efc6..81c8c04d6d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -32,10 +32,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
@@ -198,19 +196,53 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
else
{
+ int offset;
+ final int queueCount = destinationQueues.size();
+ if(queueCount == 1)
+ {
+ offset = 0;
+ }
+ else
+ {
+ offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ if(offset < 0)
+ {
+ offset = -offset;
+ }
+ }
+
+ int i = 0;
for (AMQQueue q : destinationQueues)
{
- // Increment the references to this message for each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
+ if(++i > offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
}
+ i = 0;
+ if(offset != 0)
+ {
+ for (AMQQueue q : destinationQueues)
+ {
+ if(i++ < offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
+ }
+ }
+
}
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
-
+ message.clearStoreContext();
return message;
}
finally
@@ -257,16 +289,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
return _messagePublishInfo.isImmediate();
}
-
- public void enqueue(final AMQQueue q) throws AMQException
- {
- if(_destinationQueues == null)
- {
- _destinationQueues = new ArrayList<AMQQueue>();
- }
- _destinationQueues.add(q);
- }
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;