diff options
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java')
-rw-r--r-- | java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java | 48 |
1 files changed, 5 insertions, 43 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 3b981b46b8..3d030890e0 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Rejected; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; @@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction; public class ExchangeDestination implements ReceivingDestination, SendingDestination { private static final Accepted ACCEPTED = new Accepted(); - private static final Outcome[] OUTCOMES = { ACCEPTED }; + public static final Rejected REJECTED = new Rejected(); + private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; private Exchange _exchange; private TerminusDurability _durability; @@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties); + int enqueues = _exchange.send(message, instanceProperties, txn, null); - if(queues == null || queues.isEmpty()) - { - Exchange altExchange = _exchange.getAlternateExchange(); - if(altExchange != null) - { - queues = altExchange.route(message, instanceProperties); - } - } - - if(queues != null && !queues.isEmpty()) - { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); - - txn.enqueue(queues,message, new ServerTransaction.Action() - { - MessageReference _reference = message.newReference(); - - public void postCommit() - { - for(int i = 0; i < baseQueues.length; i++) - { - try - { - baseQueues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - _reference.release(); - } - - public void onRollback() - { - _reference.release(); - } - }); - } - return ACCEPTED; + return enqueues == 0 ? REJECTED : ACCEPTED; } TerminusDurability getDurability() |