summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java62
1 files changed, 46 insertions, 16 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index e2582019cd..71d0f8b4dd 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -204,22 +207,6 @@ public class DefaultExchange implements Exchange
}
@Override
- public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties)
- {
- AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
- if(q == null)
- {
- List<AMQQueue> noQueues = Collections.emptyList();
- return noQueues;
- }
- else
- {
- return Collections.singletonList(q);
- }
-
- }
-
- @Override
public boolean isBound(AMQQueue queue)
{
return _virtualHost.getQueue(queue.getName()) == queue;
@@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange
{
return _id;
}
+
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ if(q == null)
+ {
+ return 0;
+ }
+ else
+ {
+ txn.enqueue(q,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ q.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+ }
+ }
+
}