diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-06 08:58:53 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-06 08:58:53 +0000 |
commit | df9ae5995f492547bcb7c2d84b48232382befd4d (patch) | |
tree | 97eaff0dd34834014309f110e37a24d76a9f6a66 | |
parent | 9174267aac0e56de88cf2b7f0e5bfb11721a7cde (diff) | |
download | qpid-python-df9ae5995f492547bcb7c2d84b48232382befd4d.tar.gz |
Allow sending direct to queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565124 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 70 insertions, 23 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 97977acb54..bc670bd848 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -428,10 +428,10 @@ public abstract class AbstractExchange implements Exchange return queues; } - public final <C extends Consumer> int send(final ServerMessage message, + public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<MessageInstance<C>> postEnqueueAction) + final Action<MessageInstance<? extends Consumer>> postEnqueueAction) { List<? extends BaseQueue> queues = route(message, instanceProperties); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index c25d962fb8..33c5218b4c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -335,10 +335,10 @@ public class DefaultExchange implements Exchange return _id; } - public final <C extends Consumer> int send(final ServerMessage message, + public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<MessageInstance<C>> postEnqueueAction) + final Action<MessageInstance<? extends Consumer>> postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 110c7c5bf5..967c629749 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -37,8 +37,8 @@ public interface MessageDestination extends MessageNode * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ - <C extends Consumer> int send(ServerMessage message, + int send(ServerMessage message, InstanceProperties instanceProperties, ServerTransaction txn, - Action<MessageInstance<C>> postEnqueueAction); + Action<MessageInstance<? extends Consumer>> postEnqueueAction); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index c39e531d41..97cb66cce4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -71,7 +71,7 @@ public interface MessageInstance<C extends Consumer> int getMaximumDeliveryCount(); - int routeToAlternate(Action<MessageInstance<C>> action, ServerTransaction txn); + int routeToAlternate(Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn); Filterable asFilterable(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index c4359d8a40..62927edc29 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.consumer.Consumer; @@ -36,7 +37,8 @@ import java.util.Collection; import java.util.List; import java.util.Set; -public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue<C>, MessageSource<C>, CapacityChecker +public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue, + MessageSource<C>, CapacityChecker, MessageDestination { public interface NotificationListener diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index fce0572b3b..c1c3bd37e6 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -28,9 +28,9 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; -public interface BaseQueue<C extends Consumer> extends TransactionLogResource +public interface BaseQueue extends TransactionLogResource { - void enqueue(ServerMessage message, Action<MessageInstance<C>> action) throws AMQException; + void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 788a40bf85..8b81a87903 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -364,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, ServerTransaction txn) + public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 65007df2d9..44272689fb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -42,7 +42,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -632,7 +635,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException { incrementQueueCount(); incrementQueueSize(message); @@ -2184,4 +2187,39 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return (String) _arguments.get(Queue.DESCRIPTION); } + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + { + txn.enqueue(this,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + SimpleAMQQueue.this.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 1f1e50a058..cad1aa6d4f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.util.Action; @@ -50,7 +51,7 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index e9c9a0305f..66c12717db 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -26,12 +26,14 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -224,6 +226,14 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> return _name; } + @Override + public int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + { + return 0; + } public Collection<QueueConsumer> getConsumers() @@ -302,11 +312,7 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> return getMessageCount(); } - public void enqueue(ServerMessage message) throws AMQException - { - } - - public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 7ab7a07ab6..89b366567d 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -73,7 +73,7 @@ public class MockQueueEntry implements QueueEntry } - public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, final ServerTransaction txn) + public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, final ServerTransaction txn) { return 0; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0c350c838e..5abc97cee9 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -520,10 +520,10 @@ public class SimpleAMQQueueTest extends QpidTestCase _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - _queue.enqueue(message, new Action<MessageInstance<QueueConsumer>>() + _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>() { @Override - public void performAction(final MessageInstance<QueueConsumer> object) + public void performAction(final MessageInstance<? extends Consumer> object) { QueueEntry entry = (QueueEntry) object; entry.setRedelivered(); @@ -1310,7 +1310,7 @@ public class SimpleAMQQueueTest extends QpidTestCase return message; } - private static class EntryListAddingAction implements Action<MessageInstance<QueueConsumer>> + private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>> { private final ArrayList<QueueEntry> _queueEntries; @@ -1319,7 +1319,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _queueEntries = queueEntries; } - public void performAction(MessageInstance entry) + public void performAction(MessageInstance<? extends Consumer> entry) { _queueEntries.add((QueueEntry) entry); } |