From 2caac132114f8f1f7877600b7bef21bb3681fdd3 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 5 Feb 2014 11:59:49 +0000 Subject: Use abstractions for sources and destinations for message ingress and egress in all protocol transports git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564721 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/AbstractExchange.java | 3 +- .../qpid/server/exchange/DefaultExchange.java | 3 +- .../org/apache/qpid/server/exchange/Exchange.java | 23 +---- .../qpid/server/message/MessageDestination.java | 43 +++++++++ .../qpid/server/message/MessageInstance.java | 6 +- .../apache/qpid/server/message/MessageSource.java | 107 +++++++++++++++++++++ .../qpid/server/protocol/CapacityChecker.java | 26 +++++ .../org/apache/qpid/server/queue/AMQQueue.java | 79 +-------------- .../org/apache/qpid/server/queue/BaseQueue.java | 3 +- .../queue/DefinedGroupMessageGroupManager.java | 11 ++- .../apache/qpid/server/queue/QueueEntryImpl.java | 17 ++-- .../apache/qpid/server/queue/SimpleAMQQueue.java | 7 +- .../org/apache/qpid/server/queue/SortedQueue.java | 3 +- .../server/txn/AsyncAutoCommitTransaction.java | 2 +- .../qpid/server/txn/AutoCommitTransaction.java | 2 +- .../qpid/server/txn/DistributedTransaction.java | 2 +- .../java/org/apache/qpid/server/txn/DtxBranch.java | 2 +- .../apache/qpid/server/txn/LocalTransaction.java | 2 +- .../apache/qpid/server/txn/ServerTransaction.java | 2 +- .../server/virtualhost/AbstractVirtualHost.java | 15 +++ .../qpid/server/virtualhost/VirtualHost.java | 4 + .../org/apache/qpid/server/queue/MockAMQQueue.java | 3 +- .../apache/qpid/server/queue/MockQueueEntry.java | 7 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 24 ++--- .../qpid/server/virtualhost/MockVirtualHost.java | 14 +++ .../server/protocol/v0_10/ConsumerTarget_0_10.java | 6 +- .../qpid/server/protocol/v0_10/ServerSession.java | 17 ++-- .../protocol/v0_10/ServerSessionDelegate.java | 21 ++-- .../qpid/server/protocol/v0_8/AMQChannel.java | 47 +++++---- .../server/protocol/v0_8/ConsumerTarget_0_8.java | 18 ++-- .../qpid/server/protocol/v0_8/IncomingMessage.java | 15 ++- .../v0_8/handler/BasicConsumeMethodHandler.java | 10 +- .../v0_8/handler/BasicPublishMethodHandler.java | 3 +- .../qpid/server/protocol/v0_8/AcknowledgeTest.java | 2 +- .../protocol/v0_8/QueueBrowserUsesNoAckTest.java | 2 +- .../protocol/v1_0/NodeReceivingDestination.java | 106 ++++++++++++++++++++ .../server/protocol/v1_0/QueueDestination.java | 8 +- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 21 ++-- .../qpid/server/protocol/v1_0/Session_1_0.java | 12 ++- 39 files changed, 475 insertions(+), 223 deletions(-) create mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java create mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java create mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java create mode 100644 java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 20bde5f613..bc5cdaa268 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; @@ -429,7 +430,7 @@ public abstract class AbstractExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action postEnqueueAction) + final Action postEnqueueAction) { List queues = route(message, instanceProperties); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index db3464c463..dd0121d91b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; @@ -336,7 +337,7 @@ public class DefaultExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action postEnqueueAction) + final Action postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index af1eed9032..6d83fdb2a1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -24,22 +24,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.UUID; -public interface Exchange extends ExchangeReferrer +public interface Exchange extends ExchangeReferrer, MessageDestination { void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete) throws AMQException; @@ -96,19 +90,6 @@ public interface Exchange extends ExchangeReferrer void close() throws AMQException; - /** - * Routes a message - * @param message the message to be routed - * @param instanceProperties the instance properties - * @param txn the transaction to enqueue within - * @param postEnqueueAction action to perform on the result of every enqueue (may be null) - * @return the number of queues in which the message was enqueued performed - */ - int send(ServerMessage message, - InstanceProperties instanceProperties, - ServerTransaction txn, - Action postEnqueueAction); - /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments * @param bindingKey diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java new file mode 100644 index 0000000000..c6eb8b2a2b --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; + +public interface MessageDestination +{ + + public String getName(); + + /** + * Routes a message + * @param message the message to be routed + * @param instanceProperties the instance properties + * @param txn the transaction to enqueue within + * @param postEnqueueAction action to perform on the result of every enqueue (may be null) + * @return the number of queues in which the message was enqueued performed + */ + int send(ServerMessage message, + InstanceProperties instanceProperties, + ServerTransaction txn, + Action postEnqueueAction); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 733fded846..bbe80c1db7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -45,9 +45,9 @@ public interface MessageInstance void decrementDeliveryCount(); - void addStateChangeListener(StateChangeListener listener); + void addStateChangeListener(StateChangeListener listener); - boolean removeStateChangeListener(StateChangeListener listener); + boolean removeStateChangeListener(StateChangeListener listener); boolean acquiredByConsumer(); @@ -71,7 +71,7 @@ public interface MessageInstance int getMaximumDeliveryCount(); - int routeToAlternate(Action action, ServerTransaction txn); + int routeToAlternate(Action action, ServerTransaction txn); Filterable asFilterable(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java new file mode 100644 index 0000000000..1abe3671ff --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.store.TransactionLogResource; + +import java.util.Collection; +import java.util.EnumSet; + +public interface MessageSource extends TransactionLogResource +{ + Consumer addConsumer(ConsumerTarget target, FilterManager filters, + Class messageClass, + String consumerName, EnumSet options) throws AMQException; + + Collection getConsumers(); + + void addConsumerRegistrationListener(ConsumerRegistrationListener listener); + + void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); + + AuthorizationHolder getAuthorizationHolder(); + + void setAuthorizationHolder(AuthorizationHolder principalHolder); + + void setExclusiveOwningSession(AMQSessionModel owner); + + AMQSessionModel getExclusiveOwningSession(); + + boolean isExclusive(); + + void enqueue(ServerMessage message) throws AMQException; + + interface ConsumerRegistrationListener + { + void consumerAdded(AMQQueue queue, Consumer consumer); + void consumerRemoved(AMQQueue queue, Consumer consumer); + } + + /** + * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer + * already exists. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represent failure to create a consumer, because an exclusive consumer already exists. + *
+ * + * @todo Not an AMQP exception as no status code. + * + * @todo Move to top level, used outside this class. + */ + static final class ExistingExclusiveConsumer extends AMQException + { + + public ExistingExclusiveConsumer() + { + super(""); + } + } + + /** + * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer + * already exists. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represent failure to create an exclusive consumer, as a consumer already exists. + *
+ * + * @todo Not an AMQP exception as no status code. + * + * @todo Move to top level, used outside this class. + */ + static final class ExistingConsumerPreventsExclusive extends AMQException + { + public ExistingConsumerPreventsExclusive() + { + super(""); + } + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java new file mode 100644 index 0000000000..0ba3095243 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface CapacityChecker +{ + void checkCapacity(AMQSessionModel channel); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 018ba454e4..4fe6117d88 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -25,25 +25,19 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; -import java.util.EnumSet; import java.util.List; import java.util.Set; -public interface AMQQueue extends Comparable, ExchangeReferrer, TransactionLogResource, BaseQueue +public interface AMQQueue extends Comparable, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker { - String getName(); public interface NotificationListener { @@ -75,29 +69,9 @@ public interface AMQQueue extends Comparable, ExchangeReferrer, Transa boolean isAutoDelete(); String getOwner(); - AuthorizationHolder getAuthorizationHolder(); - void setAuthorizationHolder(AuthorizationHolder principalHolder); - - void setExclusiveOwningSession(AMQSessionModel owner); - AMQSessionModel getExclusiveOwningSession(); VirtualHost getVirtualHost(); - Consumer addConsumer(final ConsumerTarget target, final FilterManager filters, - final Class messageClass, - final String consumerName, EnumSet options) throws AMQException; - - Collection getConsumers(); - - interface ConsumerRegistrationListener - { - void consumerAdded(AMQQueue queue, Consumer consumer); - void consumerRemoved(AMQQueue queue, Consumer consumer); - } - - void addConsumerRegistrationListener(ConsumerRegistrationListener listener); - void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); - int getConsumerCount(); @@ -215,8 +189,6 @@ public interface AMQQueue extends Comparable, ExchangeReferrer, Transa void stop(); - boolean isExclusive(); - Exchange getAlternateExchange(); void setAlternateExchange(Exchange exchange); @@ -224,51 +196,6 @@ public interface AMQQueue extends Comparable, ExchangeReferrer, Transa Collection getAvailableAttributes(); Object getAttribute(String attrName); - void checkCapacity(AMQSessionModel channel); - - /** - * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer - * already exists. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Represent failure to create a consumer, because an exclusive consumer already exists. - *
- * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingExclusiveConsumer extends AMQException - { - - public ExistingExclusiveConsumer() - { - super(""); - } - } - - /** - * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer - * already exists. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Represent failure to create an exclusive consumer, as a consumer already exists. - *
- * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingConsumerPreventsExclusive extends AMQException - { - public ExistingConsumerPreventsExclusive() - { - super(""); - } - } - void configure(QueueConfiguration config); void setExclusive(boolean exclusive); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index bce2bd67cc..972488da4b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; @@ -29,7 +30,7 @@ import org.apache.qpid.server.util.Action; public interface BaseQueue extends TransactionLogResource { void enqueue(ServerMessage message) throws AMQException; - void enqueue(ServerMessage message, Action action) throws AMQException; + void enqueue(ServerMessage message, Action action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index d5c03abc93..22cb6aeb7b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,19 +241,19 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return groupVal; } - private class GroupStateChangeListener implements StateChangeListener + private class GroupStateChangeListener implements StateChangeListener { private final Group _group; public GroupStateChangeListener(final Group group, - final QueueEntry entry) + final MessageInstance entry) { _group = group; } - public void stateChanged(final QueueEntry entry, - final QueueEntry.State oldState, - final QueueEntry.State newState) + public void stateChanged(final MessageInstance entry, + final MessageInstance.State oldState, + final MessageInstance.State newState) { synchronized (DefinedGroupMessageGroupManager.this) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 7908cc9de7..93bb3a8c61 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; @@ -62,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile Set> _stateChangeListeners; + private volatile Set> _stateChangeListeners; private static final AtomicReferenceFieldUpdater @@ -332,7 +333,7 @@ public abstract class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener l : _stateChangeListeners) + for(StateChangeListener l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } @@ -363,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final Action action, ServerTransaction txn) + public int routeToAlternate(final Action action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); @@ -408,21 +409,21 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener listener) { - Set> listeners = _stateChangeListeners; + Set> listeners = _stateChangeListeners; if(listeners == null) { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet>()); + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet>()); listeners = _stateChangeListeners; } listeners.add(listener); } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener listener) { - Set> listeners = _stateChangeListeners; + Set> listeners = _stateChangeListeners; if(listeners != null) { return listeners.remove(listener); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 78585997be..7435c690b3 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -635,7 +636,7 @@ public class SimpleAMQQueue implements AMQQueue, enqueue(message, null); } - public void enqueue(ServerMessage message, Action action) throws AMQException + public void enqueue(ServerMessage message, Action action) throws AMQException { incrementQueueCount(); incrementQueueSize(message); @@ -1967,7 +1968,7 @@ public class SimpleAMQQueue implements AMQQueue, return _notificationChecks; } - private final class QueueEntryListener implements StateChangeListener + private final class QueueEntryListener implements StateChangeListener { private final QueueConsumer _sub; @@ -1988,7 +1989,7 @@ public class SimpleAMQQueue implements AMQQueue, return System.identityHashCode(_sub); } - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) + public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); deliverAsync(_sub); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 44bda5182a..3185abc6cd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -49,7 +50,7 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public void enqueue(ServerMessage message, Action action) throws AMQException + public void enqueue(ServerMessage message, Action action) throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 03717ed6ae..57c67f54cd 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { Transaction txn = null; try diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index b9d91a647b..4ea48c6a24 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -154,7 +154,7 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { Transaction txn = null; try diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 238facf4b5..4a7c16a7cd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -105,7 +105,7 @@ public class DistributedTransaction implements ServerTransaction } } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { if(_branch != null) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index ada4eeb553..2505548ab8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -363,7 +363,7 @@ public class DtxBranch } - public void enqueue(BaseQueue queue, EnqueueableMessage message) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message) { _enqueueRecords.add(new Record(queue, message)); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 93482153d3..4b02d4f8ec 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -197,7 +197,7 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index 3355a7ed06..cae5fa73bf 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -94,7 +94,7 @@ public interface ServerTransaction * * A store operation will result only for a persistent message on a durable queue. */ - void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction); + void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction); /** * Enqueue a message(s) to queue(s) registering a post transaction action. diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 5859ce3c68..99b7407bde 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -47,6 +47,8 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -440,6 +442,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg return _queueRegistry.getQueue(name); } + @Override + public MessageSource getMessageSource(final String name) + { + return getQueue(name); + } + @Override public AMQQueue getQueue(UUID id) { @@ -524,6 +532,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return getExchange(name); + } + @Override public Exchange getExchange(String name) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 2ebbedccd4..0c1b949e62 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -30,6 +30,8 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; @@ -49,6 +51,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable String getName(); AMQQueue getQueue(String name); + MessageSource getMessageSource(String name); AMQQueue getQueue(UUID id); @@ -76,6 +79,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable void removeExchange(Exchange exchange, boolean force) throws AMQException; + MessageDestination getMessageDestination(String name); Exchange getExchange(String name); Exchange getExchange(UUID id); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 36fd92004a..1158781beb 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; @@ -305,7 +306,7 @@ public class MockAMQQueue implements AMQQueue { } - public void enqueue(ServerMessage message, Action action) throws AMQException + public void enqueue(ServerMessage message, Action action) throws AMQException { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index b3a124a6bd..95a6030d6a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.Consumer; @@ -62,7 +63,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener listener) { } @@ -72,7 +73,7 @@ public class MockQueueEntry implements QueueEntry } - public int routeToAlternate(final Action action, final ServerTransaction txn) + public int routeToAlternate(final Action action, final ServerTransaction txn) { return 0; } @@ -152,7 +153,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener listener) { return false; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 542f6ba0d1..62120f26d3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -245,11 +245,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() + Action postEnqueueAction = new Action() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry) entry); } }; @@ -298,11 +298,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.ACQUIRES)); final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() + Action postEnqueueAction = new Action() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry) entry); } }; @@ -356,11 +356,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() + Action postEnqueueAction = new Action() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry) entry); } }; @@ -420,11 +420,11 @@ public class SimpleAMQQueueTest extends QpidTestCase final ArrayList queueEntries = new ArrayList(); - Action postEnqueueAction = new Action() + Action postEnqueueAction = new Action() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry)entry); } }; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 1ca7ff1b65..832b89c81a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -27,6 +27,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; @@ -126,6 +128,12 @@ public class MockVirtualHost implements VirtualHost return null; } + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + @Override public AMQQueue getQueue(UUID id) { @@ -173,6 +181,12 @@ public class MockVirtualHost implements VirtualHost { } + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + @Override public Exchange getExchange(String name) { diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 2093490ee2..c478956629 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -385,13 +385,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); - int requeues = entry.routeToAlternate(new Action() + int requeues = entry.routeToAlternate(new Action() { @Override - public void performAction(final QueueEntry requeueEntry) + public void performAction(final MessageInstance requeueEntry) { logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); + requeueEntry.getOwningResource().getName())); } }, null); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 67b3740bb6..b5c4724292 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,7 +46,6 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -55,14 +54,16 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -104,12 +105,16 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final Action _checkCapacityAction = new Action() + private final Action _checkCapacityAction = new Action() { @Override - public void performAction(final QueueEntry entry) + public void performAction(final MessageInstance entry) { - entry.getQueue().checkCapacity(ServerSession.this); + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(ServerSession.this); + } } }; @@ -188,7 +193,7 @@ public class ServerSession extends Session public int enqueue(final MessageTransferMessage message, final InstanceProperties instanceProperties, - final Exchange exchange) + final MessageDestination exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d1b24a2fdb..9a90b74656 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -35,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -196,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate String queueName = method.getQueue(); VirtualHost vhost = getVirtualHost(session); - final AMQQueue queue = vhost.getQueue(queueName); + final MessageSource queue = vhost.getMessageSource(queueName); if(queue == null) { @@ -308,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - final Exchange exchange = getExchangeForMessage(ssn, xfr); + final MessageDestination exchange = getDestinationForMessage(ssn, xfr); final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -327,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate return; } - final Exchange exchangeInUse; final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage storeMessage = createStoreMessage(xfr, messageMetaData, store); final ServerSession serverSession = (ServerSession) ssn; @@ -829,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate return getVirtualHost(session).getExchange(exchangeName); } - private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) + private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr) { VirtualHost virtualHost = getVirtualHost(ssn); - Exchange exchange; + MessageDestination destination; if(xfr.hasDestination()) { - exchange = virtualHost.getExchange(xfr.getDestination()); - if(exchange == null) + destination = virtualHost.getMessageDestination(xfr.getDestination()); + if(destination == null) { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } } else { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } - return exchange; + return destination; } private VirtualHost getVirtualHost(Session session) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d2b1f83513..dc9a6484fa 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -56,9 +56,12 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -256,7 +259,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _channelId; } - public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException + public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException { String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); SecurityManager securityManager = getVirtualHost().getSecurityManager(); @@ -265,7 +268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F throw new AMQSecurityException("Permission denied: " + e.getName()); } _currentMessage = new IncomingMessage(info); - _currentMessage.setExchange(e); + _currentMessage.setMessageDestination(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) @@ -350,7 +353,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction, immediate ? _immediateAction : _capacityCheckAction); if(enqueues == 0) { @@ -497,19 +500,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * + * * @param tag the tag chosen by the client (if null, server will generate one) - * @param queue the queue to subscribe to + * @param source the queue to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * - * @param noLocal Flag stopping own messages being received. * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * * @throws AMQException if something goes wrong */ - public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException + public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, + FieldTable filters, boolean exclusive) throws AMQException { if (tag == null) { @@ -557,7 +560,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { Consumer sub = - queue.addConsumer(target, + source.addConsumer(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), @@ -1189,16 +1192,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction implements Action + private class ImmediateAction implements Action { public ImmediateAction() { } - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - AMQQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if (!entry.getDeliveredToConsumer() && entry.acquire()) { @@ -1246,19 +1249,25 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - queue.checkCapacity(AMQChannel.this); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } } - private final class CapacityCheckAction implements Action + private final class CapacityCheckAction implements Action { @Override - public void performAction(final QueueEntry entry) + public void performAction(final MessageInstance entry) { - AMQQueue queue = entry.getQueue(); - queue.checkCapacity(AMQChannel.this); + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } @@ -1477,13 +1486,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F final ServerMessage msg = rejectedQueueEntry.getMessage(); final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); - int requeues = rejectedQueueEntry.routeToAlternate(new Action() + int requeues = rejectedQueueEntry.routeToAlternate(new Action() { @Override - public void performAction(final QueueEntry requeueEntry) + public void performAction(final MessageInstance requeueEntry) { _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); + requeueEntry.getOwningResource().getName())); } }, null); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 536bc18964..47700f812f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -49,13 +49,13 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { - private final StateChangeListener _entryReleaseListener = - new StateChangeListener() + private final StateChangeListener _entryReleaseListener = + new StateChangeListener() { @Override - public void stateChanged(final QueueEntry entry, - final QueueEntry.State oldSate, - final QueueEntry.State newState) + public void stateChanged(final MessageInstance entry, + final MessageInstance.State oldSate, + final MessageInstance.State newState) { if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) { @@ -463,7 +463,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen _creditManager.restoreCredit(1, message.getSize()); } - protected final StateChangeListener getReleasedStateChangeListener() + protected final StateChangeListener getReleasedStateChangeListener() { return _entryReleaseListener; } @@ -526,11 +526,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen final long size = entry.getMessage().getSize(); _unacknowledgedBytes.addAndGet(size); _unacknowledgedCount.incrementAndGet(); - entry.addStateChangeListener(new StateChangeListener() + entry.addStateChangeListener(new StateChangeListener() { - public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState) + public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState) { - if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED)) + if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED)) { _unacknowledgedBytes.addAndGet(-size); _unacknowledgedCount.decrementAndGet(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index 5a9a51ff59..80c4c77b65 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -20,15 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.message.MessageDestination; import java.util.ArrayList; import java.util.List; @@ -38,7 +35,7 @@ public class IncomingMessage private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - private Exchange _exchange; + private MessageDestination _messageDestination; /** * Keeps a track of how many bytes we have received in body frames @@ -77,9 +74,9 @@ public class IncomingMessage return _messagePublishInfo.getExchange(); } - public Exchange getExchange() + public MessageDestination getDestination() { - return _exchange; + return _messageDestination; } public ContentHeaderBody getContentHeader() @@ -92,9 +89,9 @@ public class IncomingMessage return getContentHeader().getBodySize(); } - public void setExchange(final Exchange e) + public void setMessageDestination(final MessageDestination e) { - _exchange = e; + _messageDestination = e; } public int getBodyCount() throws AMQException diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index c2d02c1df8..526bc9b9fe 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener _resumeFullTransfers = new ArrayList(); private List _resumeAcceptedTransfers = new ArrayList(); private Runnable _closeAction; - private final AMQQueue _queue; + private final MessageSource _queue; public SendingLink_1_0(final SendingLinkAttachment linkAttachment, @@ -121,7 +122,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _queue = ((QueueDestination) _destination).getQueue(); - if(_queue.getAvailableAttributes().contains("topic")) + if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } @@ -217,7 +218,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - _queue = _vhost.createQueue( + queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(name, _vhost.getName()), name, isDurable, @@ -229,8 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } else { - _queue = queue; - List bindings = _queue.getBindings(); + List bindings = queue.getBindings(); List bindingsToRemove = new ArrayList(); for(Binding existingBinding : bindings) { @@ -313,15 +313,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } } } + _queue = queue; source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - exchange.addBinding(binding, _queue,null); + exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); if(!isDurable) { final String queueName = name; - final AMQQueue tempQueue = _queue; + final AMQQueue tempQueue = queue; final Action deleteQueueTask = new Action() @@ -345,7 +346,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - _queue.addQueueDeleteTask(new Action() + queue.addQueueDeleteTask(new Action() { public void performAction(AMQQueue queue) { @@ -356,7 +357,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS }); } - qd = new QueueDestination(_queue); + qd = new QueueDestination(queue); } catch (AMQSecurityException e) { @@ -454,7 +455,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - _queue.getVirtualHost().removeQueue(_queue); + _vhost.removeQueue((AMQQueue)_queue); } catch(AMQException e) { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 51ff9c13cb..c7508fa913 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -109,7 +111,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueue(addr); + MessageSource queue = _vhost.getMessageSource(addr); if(queue != null) { @@ -250,11 +252,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } String addr = target.getAddress(); - Exchange exchg = _vhost.getExchange(addr); - if(exchg != null) + MessageDestination messageDestination = _vhost.getMessageDestination(addr); + if(messageDestination != null) { - destination = new ExchangeDestination(exchg, target.getDurable(), - target.getExpiryPolicy()); + destination = new NodeReceivingDestination(messageDestination, target.getDurable(), + target.getExpiryPolicy()); } else { -- cgit v1.2.1