summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java48
1 files changed, 5 insertions, 43 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 3b981b46b8..3d030890e0 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
{
private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = { ACCEPTED };
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private Exchange _exchange;
private TerminusDurability _durability;
@@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
- if(queues == null || queues.isEmpty())
- {
- Exchange altExchange = _exchange.getAlternateExchange();
- if(altExchange != null)
- {
- queues = altExchange.route(message, instanceProperties);
- }
- }
-
- if(queues != null && !queues.isEmpty())
- {
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
-
- txn.enqueue(queues,message, new ServerTransaction.Action()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit()
- {
- for(int i = 0; i < baseQueues.length; i++)
- {
- try
- {
- baseQueues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- }
- return ACCEPTED;
+ return enqueues == 0 ? REJECTED : ACCEPTED;
}
TerminusDurability getDurability()