From df9ae5995f492547bcb7c2d84b48232382befd4d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 6 Feb 2014 08:58:53 +0000 Subject: Allow sending direct to queues git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565124 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/AbstractExchange.java | 4 +-- .../qpid/server/exchange/DefaultExchange.java | 4 +-- .../qpid/server/message/MessageDestination.java | 4 +-- .../qpid/server/message/MessageInstance.java | 2 +- .../org/apache/qpid/server/queue/AMQQueue.java | 4 ++- .../org/apache/qpid/server/queue/BaseQueue.java | 4 +-- .../apache/qpid/server/queue/QueueEntryImpl.java | 2 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 40 +++++++++++++++++++++- .../org/apache/qpid/server/queue/SortedQueue.java | 3 +- .../org/apache/qpid/server/queue/MockAMQQueue.java | 16 ++++++--- .../apache/qpid/server/queue/MockQueueEntry.java | 2 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 8 ++--- 12 files changed, 70 insertions(+), 23 deletions(-) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 97977acb54..bc670bd848 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -428,10 +428,10 @@ public abstract class AbstractExchange implements Exchange return queues; } - public final int send(final ServerMessage message, + public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action> postEnqueueAction) + final Action> postEnqueueAction) { List queues = route(message, instanceProperties); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index c25d962fb8..33c5218b4c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -335,10 +335,10 @@ public class DefaultExchange implements Exchange return _id; } - public final int send(final ServerMessage message, + public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action> postEnqueueAction) + final Action> postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 110c7c5bf5..967c629749 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -37,8 +37,8 @@ public interface MessageDestination extends MessageNode * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ - int send(ServerMessage message, + int send(ServerMessage message, InstanceProperties instanceProperties, ServerTransaction txn, - Action> postEnqueueAction); + Action> postEnqueueAction); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index c39e531d41..97cb66cce4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -71,7 +71,7 @@ public interface MessageInstance int getMaximumDeliveryCount(); - int routeToAlternate(Action> action, ServerTransaction txn); + int routeToAlternate(Action> action, ServerTransaction txn); Filterable asFilterable(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index c4359d8a40..62927edc29 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.consumer.Consumer; @@ -36,7 +37,8 @@ import java.util.Collection; import java.util.List; import java.util.Set; -public interface AMQQueue extends Comparable>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker +public interface AMQQueue extends Comparable>, ExchangeReferrer, BaseQueue, + MessageSource, CapacityChecker, MessageDestination { public interface NotificationListener diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index fce0572b3b..c1c3bd37e6 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -28,9 +28,9 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; -public interface BaseQueue extends TransactionLogResource +public interface BaseQueue extends TransactionLogResource { - void enqueue(ServerMessage message, Action> action) throws AMQException; + void enqueue(ServerMessage message, Action> action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 788a40bf85..8b81a87903 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -364,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final Action> action, ServerTransaction txn) + public int routeToAlternate(final Action> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 65007df2d9..44272689fb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -42,7 +42,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -632,7 +635,7 @@ public class SimpleAMQQueue implements AMQQueue, // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message, Action> action) throws AMQException + public void enqueue(ServerMessage message, Action> action) throws AMQException { incrementQueueCount(); incrementQueueSize(message); @@ -2184,4 +2187,39 @@ public class SimpleAMQQueue implements AMQQueue, return (String) _arguments.get(Queue.DESCRIPTION); } + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action> postEnqueueAction) + { + txn.enqueue(this,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + SimpleAMQQueue.this.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 1f1e50a058..cad1aa6d4f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.util.Action; @@ -50,7 +51,7 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public void enqueue(ServerMessage message, Action> action) throws AMQException + public void enqueue(ServerMessage message, Action> action) throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index e9c9a0305f..66c12717db 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -26,12 +26,14 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -224,6 +226,14 @@ public class MockAMQQueue implements AMQQueue return _name; } + @Override + public int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action> postEnqueueAction) + { + return 0; + } public Collection getConsumers() @@ -302,11 +312,7 @@ public class MockAMQQueue implements AMQQueue return getMessageCount(); } - public void enqueue(ServerMessage message) throws AMQException - { - } - - public void enqueue(ServerMessage message, Action> action) throws AMQException + public void enqueue(ServerMessage message, Action> action) throws AMQException { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 7ab7a07ab6..89b366567d 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -73,7 +73,7 @@ public class MockQueueEntry implements QueueEntry } - public int routeToAlternate(final Action> action, final ServerTransaction txn) + public int routeToAlternate(final Action> action, final ServerTransaction txn) { return 0; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0c350c838e..5abc97cee9 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -520,10 +520,10 @@ public class SimpleAMQQueueTest extends QpidTestCase _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - _queue.enqueue(message, new Action>() + _queue.enqueue(message, new Action>() { @Override - public void performAction(final MessageInstance object) + public void performAction(final MessageInstance object) { QueueEntry entry = (QueueEntry) object; entry.setRedelivered(); @@ -1310,7 +1310,7 @@ public class SimpleAMQQueueTest extends QpidTestCase return message; } - private static class EntryListAddingAction implements Action> + private static class EntryListAddingAction implements Action> { private final ArrayList _queueEntries; @@ -1319,7 +1319,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _queueEntries = queueEntries; } - public void performAction(MessageInstance entry) + public void performAction(MessageInstance entry) { _queueEntries.add((QueueEntry) entry); } -- cgit v1.2.1