summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-05 11:59:49 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-05 11:59:49 +0000
commit2caac132114f8f1f7877600b7bef21bb3681fdd3 (patch)
tree0a0a629b30d499e95aff20280f4242ba80923d42
parent7a54b9a25cf96675325a8cb6bfd1d2e4f43b8edd (diff)
downloadqpid-python-2caac132114f8f1f7877600b7bef21bb3681fdd3.tar.gz
Use abstractions for sources and destinations for message ingress and egress in all protocol transports
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564721 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java3
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java3
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java23
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java43
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java107
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java26
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java79
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java3
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java11
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java17
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java3
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java15
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java4
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java3
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java7
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java24
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java14
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java6
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java17
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java21
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java47
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java18
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java15
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java10
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java2
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java106
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java8
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java21
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java12
39 files changed, 475 insertions, 223 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 20bde5f613..bc5cdaa268 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -429,7 +430,7 @@ public abstract class AbstractExchange implements Exchange
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<QueueEntry> postEnqueueAction)
+ final Action<MessageInstance> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index db3464c463..dd0121d91b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -336,7 +337,7 @@ public class DefaultExchange implements Exchange
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<QueueEntry> postEnqueueAction)
+ final Action<MessageInstance> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index af1eed9032..6d83fdb2a1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -24,22 +24,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-public interface Exchange extends ExchangeReferrer
+public interface Exchange extends ExchangeReferrer, MessageDestination
{
void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -97,19 +91,6 @@ public interface Exchange extends ExchangeReferrer
void close() throws AMQException;
/**
- * Routes a message
- * @param message the message to be routed
- * @param instanceProperties the instance properties
- * @param txn the transaction to enqueue within
- * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
- * @return the number of queues in which the message was enqueued performed
- */
- int send(ServerMessage message,
- InstanceProperties instanceProperties,
- ServerTransaction txn,
- Action<QueueEntry> postEnqueueAction);
-
- /**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
* @param bindingKey
* @param arguments
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
new file mode 100644
index 0000000000..c6eb8b2a2b
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public interface MessageDestination
+{
+
+ public String getName();
+
+ /**
+ * Routes a message
+ * @param message the message to be routed
+ * @param instanceProperties the instance properties
+ * @param txn the transaction to enqueue within
+ * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
+ * @return the number of queues in which the message was enqueued performed
+ */
+ int send(ServerMessage message,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ Action<MessageInstance> postEnqueueAction);
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 733fded846..bbe80c1db7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -45,9 +45,9 @@ public interface MessageInstance
void decrementDeliveryCount();
- void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+ void addStateChangeListener(StateChangeListener<MessageInstance, State> listener);
- boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener);
boolean acquiredByConsumer();
@@ -71,7 +71,7 @@ public interface MessageInstance
int getMaximumDeliveryCount();
- int routeToAlternate(Action<QueueEntry> action, ServerTransaction txn);
+ int routeToAlternate(Action<MessageInstance> action, ServerTransaction txn);
Filterable asFilterable();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
new file mode 100644
index 0000000000..1abe3671ff
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.store.TransactionLogResource;
+
+import java.util.Collection;
+import java.util.EnumSet;
+
+public interface MessageSource extends TransactionLogResource
+{
+ Consumer addConsumer(ConsumerTarget target, FilterManager filters,
+ Class<? extends ServerMessage> messageClass,
+ String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
+
+ Collection<Consumer> getConsumers();
+
+ void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
+
+ void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
+
+ AuthorizationHolder getAuthorizationHolder();
+
+ void setAuthorizationHolder(AuthorizationHolder principalHolder);
+
+ void setExclusiveOwningSession(AMQSessionModel owner);
+
+ AMQSessionModel getExclusiveOwningSession();
+
+ boolean isExclusive();
+
+ void enqueue(ServerMessage message) throws AMQException;
+
+ interface ConsumerRegistrationListener
+ {
+ void consumerAdded(AMQQueue queue, Consumer consumer);
+ void consumerRemoved(AMQQueue queue, Consumer consumer);
+ }
+
+ /**
+ * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
+ static final class ExistingExclusiveConsumer extends AMQException
+ {
+
+ public ExistingExclusiveConsumer()
+ {
+ super("");
+ }
+ }
+
+ /**
+ * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
+ static final class ExistingConsumerPreventsExclusive extends AMQException
+ {
+ public ExistingConsumerPreventsExclusive()
+ {
+ super("");
+ }
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
new file mode 100644
index 0000000000..0ba3095243
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface CapacityChecker
+{
+ void checkCapacity(AMQSessionModel channel);
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 018ba454e4..4fe6117d88 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -25,25 +25,19 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.consumer.Consumer;
-import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.List;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker
{
- String getName();
public interface NotificationListener
{
@@ -75,29 +69,9 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
boolean isAutoDelete();
String getOwner();
- AuthorizationHolder getAuthorizationHolder();
- void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
- void setExclusiveOwningSession(AMQSessionModel owner);
- AMQSessionModel getExclusiveOwningSession();
VirtualHost getVirtualHost();
- Consumer addConsumer(final ConsumerTarget target, final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
-
- Collection<Consumer> getConsumers();
-
- interface ConsumerRegistrationListener
- {
- void consumerAdded(AMQQueue queue, Consumer consumer);
- void consumerRemoved(AMQQueue queue, Consumer consumer);
- }
-
- void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
- void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
-
int getConsumerCount();
@@ -215,8 +189,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
void stop();
- boolean isExclusive();
-
Exchange getAlternateExchange();
void setAlternateExchange(Exchange exchange);
@@ -224,51 +196,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
- void checkCapacity(AMQSessionModel channel);
-
- /**
- * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingExclusiveConsumer extends AMQException
- {
-
- public ExistingExclusiveConsumer()
- {
- super("");
- }
- }
-
- /**
- * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingConsumerPreventsExclusive extends AMQException
- {
- public ExistingConsumerPreventsExclusive()
- {
- super("");
- }
- }
-
void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index bce2bd67cc..972488da4b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -22,6 +22,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
@@ -29,7 +30,7 @@ import org.apache.qpid.server.util.Action;
public interface BaseQueue extends TransactionLogResource
{
void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException;
+ void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index d5c03abc93..22cb6aeb7b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,19 +241,19 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance, QueueEntry.State>
{
private final Group _group;
public GroupStateChangeListener(final Group group,
- final QueueEntry entry)
+ final MessageInstance entry)
{
_group = group;
}
- public void stateChanged(final QueueEntry entry,
- final QueueEntry.State oldState,
- final QueueEntry.State newState)
+ public void stateChanged(final MessageInstance entry,
+ final MessageInstance.State oldState,
+ final MessageInstance.State newState)
{
synchronized (DefinedGroupMessageGroupManager.this)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 7908cc9de7..93bb3a8c61 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -62,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
+ private volatile Set<StateChangeListener<MessageInstance, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -332,7 +333,7 @@ public abstract class QueueEntryImpl implements QueueEntry
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
+ for(StateChangeListener<MessageInstance, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -363,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -408,21 +409,21 @@ public abstract class QueueEntryImpl implements QueueEntry
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
- Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
- Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 78585997be..7435c690b3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -635,7 +636,7 @@ public class SimpleAMQQueue implements AMQQueue,
enqueue(message, null);
}
- public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
@@ -1967,7 +1968,7 @@ public class SimpleAMQQueue implements AMQQueue,
return _notificationChecks;
}
- private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
+ private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
{
private final QueueConsumer _sub;
@@ -1988,7 +1989,7 @@ public class SimpleAMQQueue implements AMQQueue,
return System.identityHashCode(_sub);
}
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 44bda5182a..3185abc6cd 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -20,6 +20,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -49,7 +50,7 @@ public class SortedQueue extends OutOfOrderQueue
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 03717ed6ae..57c67f54cd 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index b9d91a647b..4ea48c6a24 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -154,7 +154,7 @@ public class AutoCommitTransaction implements ServerTransaction
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
index 238facf4b5..4a7c16a7cd 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
@@ -105,7 +105,7 @@ public class DistributedTransaction implements ServerTransaction
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index ada4eeb553..2505548ab8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -363,7 +363,7 @@ public class DtxBranch
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
{
_enqueueRecords.add(new Record(queue, message));
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 93482153d3..4b02d4f8ec 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -197,7 +197,7 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index 3355a7ed06..cae5fa73bf 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -94,7 +94,7 @@ public interface ServerTransaction
*
* A store operation will result only for a persistent message on a durable queue.
*/
- void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+ void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
/**
* Enqueue a message(s) to queue(s) registering a post transaction action.
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 5859ce3c68..99b7407bde 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -47,6 +47,8 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -441,6 +443,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return getQueue(name);
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return _queueRegistry.getQueue(id);
@@ -524,6 +532,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return getExchange(name);
+ }
+
@Override
public Exchange getExchange(String name)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 2ebbedccd4..0c1b949e62 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -30,6 +30,8 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -49,6 +51,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
String getName();
AMQQueue getQueue(String name);
+ MessageSource getMessageSource(String name);
AMQQueue getQueue(UUID id);
@@ -76,6 +79,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
void removeExchange(Exchange exchange, boolean force) throws AMQException;
+ MessageDestination getMessageDestination(String name);
Exchange getExchange(String name);
Exchange getExchange(UUID id);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 36fd92004a..1158781beb 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
@@ -305,7 +306,7 @@ public class MockAMQQueue implements AMQQueue
{
}
- public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
{
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index b3a124a6bd..95a6030d6a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.consumer.Consumer;
@@ -62,7 +63,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
}
@@ -72,7 +73,7 @@ public class MockQueueEntry implements QueueEntry
}
- public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance> action, final ServerTransaction txn)
{
return 0;
}
@@ -152,7 +153,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
{
return false;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 542f6ba0d1..62120f26d3 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -245,11 +245,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry) entry);
}
};
@@ -298,11 +298,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry) entry);
}
};
@@ -356,11 +356,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry) entry);
}
};
@@ -420,11 +420,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+ Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
{
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- queueEntries.add(entry);
+ queueEntries.add((QueueEntry)entry);
}
};
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 1ca7ff1b65..832b89c81a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -27,6 +27,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -127,6 +129,12 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return null;
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return null;
@@ -174,6 +182,12 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return null;
+ }
+
+ @Override
public Exchange getExchange(String name)
{
return null;
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 2093490ee2..c478956629 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -385,13 +385,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
- int requeues = entry.routeToAlternate(new Action<QueueEntry>()
+ int requeues = entry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void performAction(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 67b3740bb6..b5c4724292 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,7 +46,6 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,14 +54,16 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -104,12 +105,16 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>()
+ private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>()
{
@Override
- public void performAction(final QueueEntry entry)
+ public void performAction(final MessageInstance entry)
{
- entry.getQueue().checkCapacity(ServerSession.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
}
};
@@ -188,7 +193,7 @@ public class ServerSession extends Session
public int enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
- final Exchange exchange)
+ final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d1b24a2fdb..9a90b74656 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -35,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -196,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate
String queueName = method.getQueue();
VirtualHost vhost = getVirtualHost(session);
- final AMQQueue queue = vhost.getQueue(queueName);
+ final MessageSource queue = vhost.getMessageSource(queueName);
if(queue == null)
{
@@ -308,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -327,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- final Exchange exchangeInUse;
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
@@ -829,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate
return getVirtualHost(session).getExchange(exchangeName);
}
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+ private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
{
VirtualHost virtualHost = getVirtualHost(ssn);
- Exchange exchange;
+ MessageDestination destination;
if(xfr.hasDestination())
{
- exchange = virtualHost.getExchange(xfr.getDestination());
- if(exchange == null)
+ destination = virtualHost.getMessageDestination(xfr.getDestination());
+ if(destination == null)
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
- return exchange;
+ return destination;
}
private VirtualHost getVirtualHost(Session session)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d2b1f83513..dc9a6484fa 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -56,9 +56,12 @@ import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -256,7 +259,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return _channelId;
}
- public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+ public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
{
String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -265,7 +268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
throw new AMQSecurityException("Permission denied: " + e.getName());
}
_currentMessage = new IncomingMessage(info);
- _currentMessage.setExchange(e);
+ _currentMessage.setMessageDestination(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -350,7 +353,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
};
- int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
immediate ? _immediateAction : _capacityCheckAction);
if(enqueues == 0)
{
@@ -497,19 +500,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
+ *
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
+ * @param source the queue to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ FieldTable filters, boolean exclusive) throws AMQException
{
if (tag == null)
{
@@ -557,7 +560,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
try
{
Consumer sub =
- queue.addConsumer(target,
+ source.addConsumer(target,
FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
AMQMessage.class,
AMQShortString.toString(tag),
@@ -1189,16 +1192,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private class ImmediateAction implements Action<QueueEntry>
+ private class ImmediateAction implements Action<MessageInstance>
{
public ImmediateAction()
{
}
- public void performAction(QueueEntry entry)
+ public void performAction(MessageInstance entry)
{
- AMQQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if (!entry.getDeliveredToConsumer() && entry.acquire())
{
@@ -1246,19 +1249,25 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
- queue.checkCapacity(AMQChannel.this);
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
}
- private final class CapacityCheckAction implements Action<QueueEntry>
+ private final class CapacityCheckAction implements Action<MessageInstance>
{
@Override
- public void performAction(final QueueEntry entry)
+ public void performAction(final MessageInstance entry)
{
- AMQQueue queue = entry.getQueue();
- queue.checkCapacity(AMQChannel.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
@@ -1477,13 +1486,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
final ServerMessage msg = rejectedQueueEntry.getMessage();
final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
- int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
+ int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void performAction(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
_actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 536bc18964..47700f812f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -49,13 +49,13 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
- private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
- new StateChangeListener<QueueEntry, QueueEntry.State>()
+ private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener =
+ new StateChangeListener<MessageInstance, MessageInstance.State>()
{
@Override
- public void stateChanged(final QueueEntry entry,
- final QueueEntry.State oldSate,
- final QueueEntry.State newState)
+ public void stateChanged(final MessageInstance entry,
+ final MessageInstance.State oldSate,
+ final MessageInstance.State newState)
{
if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
{
@@ -463,7 +463,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
_creditManager.restoreCredit(1, message.getSize());
}
- protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
+ protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener()
{
return _entryReleaseListener;
}
@@ -526,11 +526,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
final long size = entry.getMessage().getSize();
_unacknowledgedBytes.addAndGet(size);
_unacknowledgedCount.incrementAndGet();
- entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()
+ entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
{
- public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
{
- if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+ if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED))
{
_unacknowledgedBytes.addAndGet(-size);
_unacknowledgedCount.decrementAndGet();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
index 5a9a51ff59..80c4c77b65 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
import java.util.ArrayList;
import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private Exchange _exchange;
+ private MessageDestination _messageDestination;
/**
* Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
return _messagePublishInfo.getExchange();
}
- public Exchange getExchange()
+ public MessageDestination getDestination()
{
- return _exchange;
+ return _messageDestination;
}
public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
return getContentHeader().getBodySize();
}
- public void setExchange(final Exchange e)
+ public void setMessageDestination(final MessageDestination e)
{
- _exchange = e;
+ _messageDestination = e;
}
public int getBodyCount() throws AMQException
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index c2d02c1df8..526bc9b9fe 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
@@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ queue,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
index 497e97db3e..f8a7722447 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchange(exchangeName.toString());
+ MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index bb5fecdfb4..281f7345ff 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
getQueue().deliverAsync();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index 247bc53cd1..479c715b2a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -144,6 +144,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ return channel.consumeFromSource(null, queue, true, filters, true);
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
new file mode 100644
index 0000000000..70f659b546
--- /dev/null
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class NodeReceivingDestination implements ReceivingDestination
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
+
+ private MessageDestination _exchange;
+ private TerminusDurability _durability;
+ private TerminusExpiryPolicy _expiryPolicy;
+
+ public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
+ {
+ _exchange = exchange;
+ _durability = durable;
+ _expiryPolicy = expiryPolicy;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ {
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case MANDATORY:
+ return false;
+ case REDELIVERED:
+ return false;
+ case PERSISTENT:
+ return message.isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return message.getExpiration();
+ }
+ return null;
+ }};
+
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
+
+
+ return enqueues == 0 ? REJECTED : ACCEPTED;
+ }
+
+ TerminusDurability getDurability()
+ {
+ return _durability;
+ }
+
+ TerminusExpiryPolicy getExpiryPolicy()
+ {
+ return _expiryPolicy;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 20000;
+ }
+
+ public MessageDestination getDestination()
+ {
+ return _exchange;
+ }
+}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index b9c10b925f..c2d124b427 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -35,9 +36,9 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
- private AMQQueue _queue;
+ private MessageSource _queue;
- public QueueDestination(AMQQueue queue)
+ public QueueDestination(MessageSource queue)
{
_queue = queue;
}
@@ -60,7 +61,6 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
{
try
{
-
_queue.enqueue(message);
}
catch (Exception e)
@@ -91,7 +91,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
return 100;
}
- public AMQQueue getQueue()
+ public MessageSource getQueue()
{
return _queue;
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 2fff1856c7..f7867d6178 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -66,6 +66,7 @@ import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.consumer.Consumer;
@@ -95,7 +96,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
private Runnable _closeAction;
- private final AMQQueue _queue;
+ private final MessageSource _queue;
public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
@@ -121,7 +122,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_queue = ((QueueDestination) _destination).getQueue();
- if(_queue.getAvailableAttributes().contains("topic"))
+ if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
@@ -217,7 +218,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(queue == null)
{
- _queue = _vhost.createQueue(
+ queue = _vhost.createQueue(
UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
name,
isDurable,
@@ -229,8 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
else
{
- _queue = queue;
- List<Binding> bindings = _queue.getBindings();
+ List<Binding> bindings = queue.getBindings();
List<Binding> bindingsToRemove = new ArrayList<Binding>();
for(Binding existingBinding : bindings)
{
@@ -313,15 +313,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
}
}
+ _queue = queue;
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- exchange.addBinding(binding, _queue,null);
+ exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
if(!isDurable)
{
final String queueName = name;
- final AMQQueue tempQueue = _queue;
+ final AMQQueue tempQueue = queue;
final Action<Connection_1_0> deleteQueueTask =
new Action<Connection_1_0>()
@@ -345,7 +346,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
- _queue.addQueueDeleteTask(new Action<AMQQueue>()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
public void performAction(AMQQueue queue)
{
@@ -356,7 +357,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
});
}
- qd = new QueueDestination(_queue);
+ qd = new QueueDestination(queue);
}
catch (AMQSecurityException e)
{
@@ -454,7 +455,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
try
{
- _queue.getVirtualHost().removeQueue(_queue);
+ _vhost.removeQueue((AMQQueue)_queue);
}
catch(AMQException e)
{
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 51ff9c13cb..c7508fa913 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -109,7 +111,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueue(addr);
+ MessageSource queue = _vhost.getMessageSource(addr);
if(queue != null)
{
@@ -250,11 +252,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
String addr = target.getAddress();
- Exchange exchg = _vhost.getExchange(addr);
- if(exchg != null)
+ MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+ if(messageDestination != null)
{
- destination = new ExchangeDestination(exchg, target.getDurable(),
- target.getExpiryPolicy());
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy());
}
else
{