diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java | 62 |
1 files changed, 46 insertions, 16 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index e2582019cd..71d0f8b4dd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange @@ -204,22 +207,6 @@ public class DefaultExchange implements Exchange } @Override - public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties) - { - AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); - if(q == null) - { - List<AMQQueue> noQueues = Collections.emptyList(); - return noQueues; - } - else - { - return Collections.singletonList(q); - } - - } - - @Override public boolean isBound(AMQQueue queue) { return _virtualHost.getQueue(queue.getName()) == queue; @@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange { return _id; } + + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final BaseQueue.PostEnqueueAction postEnqueueAction) + { + final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + if(q == null) + { + return 0; + } + else + { + txn.enqueue(q,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + q.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + } + } + } |