summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-06 08:58:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-06 08:58:53 +0000
commitdf9ae5995f492547bcb7c2d84b48232382befd4d (patch)
tree97eaff0dd34834014309f110e37a24d76a9f6a66
parent9174267aac0e56de88cf2b7f0e5bfb11721a7cde (diff)
downloadqpid-python-df9ae5995f492547bcb7c2d84b48232382befd4d.tar.gz
Allow sending direct to queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565124 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java40
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java16
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java8
12 files changed, 70 insertions, 23 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 97977acb54..bc670bd848 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -428,10 +428,10 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
- public final <C extends Consumer> int send(final ServerMessage message,
+ public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance<C>> postEnqueueAction)
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index c25d962fb8..33c5218b4c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -335,10 +335,10 @@ public class DefaultExchange implements Exchange
return _id;
}
- public final <C extends Consumer> int send(final ServerMessage message,
+ public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance<C>> postEnqueueAction)
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 110c7c5bf5..967c629749 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -37,8 +37,8 @@ public interface MessageDestination extends MessageNode
* @param postEnqueueAction action to perform on the result of every enqueue (may be null)
* @return the number of queues in which the message was enqueued performed
*/
- <C extends Consumer> int send(ServerMessage message,
+ int send(ServerMessage message,
InstanceProperties instanceProperties,
ServerTransaction txn,
- Action<MessageInstance<C>> postEnqueueAction);
+ Action<MessageInstance<? extends Consumer>> postEnqueueAction);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index c39e531d41..97cb66cce4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -71,7 +71,7 @@ public interface MessageInstance<C extends Consumer>
int getMaximumDeliveryCount();
- int routeToAlternate(Action<MessageInstance<C>> action, ServerTransaction txn);
+ int routeToAlternate(Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn);
Filterable asFilterable();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index c4359d8a40..62927edc29 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.consumer.Consumer;
@@ -36,7 +37,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
-public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue<C>, MessageSource<C>, CapacityChecker
+public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue,
+ MessageSource<C>, CapacityChecker, MessageDestination
{
public interface NotificationListener
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index fce0572b3b..c1c3bd37e6 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -28,9 +28,9 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
-public interface BaseQueue<C extends Consumer> extends TransactionLogResource
+public interface BaseQueue extends TransactionLogResource
{
- void enqueue(ServerMessage message, Action<MessageInstance<C>> action) throws AMQException;
+ void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 788a40bf85..8b81a87903 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -364,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 65007df2d9..44272689fb 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -42,7 +42,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -632,7 +635,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
@@ -2184,4 +2187,39 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return (String) _arguments.get(Queue.DESCRIPTION);
}
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ {
+ txn.enqueue(this,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ SimpleAMQQueue.this.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+
+ }
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 1f1e50a058..cad1aa6d4f 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -20,6 +20,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.util.Action;
@@ -50,7 +51,7 @@ public class SortedQueue extends OutOfOrderQueue
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index e9c9a0305f..66c12717db 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -26,12 +26,14 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -224,6 +226,14 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
return _name;
}
+ @Override
+ public int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ {
+ return 0;
+ }
public Collection<QueueConsumer> getConsumers()
@@ -302,11 +312,7 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
return getMessageCount();
}
- public void enqueue(ServerMessage message) throws AMQException
- {
- }
-
- public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
{
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 7ab7a07ab6..89b366567d 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -73,7 +73,7 @@ public class MockQueueEntry implements QueueEntry
}
- public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, final ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, final ServerTransaction txn)
{
return 0;
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0c350c838e..5abc97cee9 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -520,10 +520,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
- _queue.enqueue(message, new Action<MessageInstance<QueueConsumer>>()
+ _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>()
{
@Override
- public void performAction(final MessageInstance<QueueConsumer> object)
+ public void performAction(final MessageInstance<? extends Consumer> object)
{
QueueEntry entry = (QueueEntry) object;
entry.setRedelivered();
@@ -1310,7 +1310,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
return message;
}
- private static class EntryListAddingAction implements Action<MessageInstance<QueueConsumer>>
+ private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>>
{
private final ArrayList<QueueEntry> _queueEntries;
@@ -1319,7 +1319,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queueEntries = queueEntries;
}
- public void performAction(MessageInstance entry)
+ public void performAction(MessageInstance<? extends Consumer> entry)
{
_queueEntries.add((QueueEntry) entry);
}