diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-05 11:59:49 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-05 11:59:49 +0000 |
commit | 2caac132114f8f1f7877600b7bef21bb3681fdd3 (patch) | |
tree | 0a0a629b30d499e95aff20280f4242ba80923d42 | |
parent | 7a54b9a25cf96675325a8cb6bfd1d2e4f43b8edd (diff) | |
download | qpid-python-2caac132114f8f1f7877600b7bef21bb3681fdd3.tar.gz |
Use abstractions for sources and destinations for message ingress and egress in all protocol transports
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564721 13f79535-47bb-0310-9956-ffa450edef68
39 files changed, 475 insertions, 223 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 20bde5f613..bc5cdaa268 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; @@ -429,7 +430,7 @@ public abstract class AbstractExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<QueueEntry> postEnqueueAction) + final Action<MessageInstance> postEnqueueAction) { List<? extends BaseQueue> queues = route(message, instanceProperties); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index db3464c463..dd0121d91b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; @@ -336,7 +337,7 @@ public class DefaultExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<QueueEntry> postEnqueueAction) + final Action<MessageInstance> postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index af1eed9032..6d83fdb2a1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -24,22 +24,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.UUID; -public interface Exchange extends ExchangeReferrer +public interface Exchange extends ExchangeReferrer, MessageDestination { void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete) throws AMQException; @@ -97,19 +91,6 @@ public interface Exchange extends ExchangeReferrer void close() throws AMQException; /** - * Routes a message - * @param message the message to be routed - * @param instanceProperties the instance properties - * @param txn the transaction to enqueue within - * @param postEnqueueAction action to perform on the result of every enqueue (may be null) - * @return the number of queues in which the message was enqueued performed - */ - int send(ServerMessage message, - InstanceProperties instanceProperties, - ServerTransaction txn, - Action<QueueEntry> postEnqueueAction); - - /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments * @param bindingKey * @param arguments diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java new file mode 100644 index 0000000000..c6eb8b2a2b --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; + +public interface MessageDestination +{ + + public String getName(); + + /** + * Routes a message + * @param message the message to be routed + * @param instanceProperties the instance properties + * @param txn the transaction to enqueue within + * @param postEnqueueAction action to perform on the result of every enqueue (may be null) + * @return the number of queues in which the message was enqueued performed + */ + int send(ServerMessage message, + InstanceProperties instanceProperties, + ServerTransaction txn, + Action<MessageInstance> postEnqueueAction); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 733fded846..bbe80c1db7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -45,9 +45,9 @@ public interface MessageInstance void decrementDeliveryCount(); - void addStateChangeListener(StateChangeListener<QueueEntry, State> listener); + void addStateChangeListener(StateChangeListener<MessageInstance, State> listener); - boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener); + boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener); boolean acquiredByConsumer(); @@ -71,7 +71,7 @@ public interface MessageInstance int getMaximumDeliveryCount(); - int routeToAlternate(Action<QueueEntry> action, ServerTransaction txn); + int routeToAlternate(Action<MessageInstance> action, ServerTransaction txn); Filterable asFilterable(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java new file mode 100644 index 0000000000..1abe3671ff --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.store.TransactionLogResource; + +import java.util.Collection; +import java.util.EnumSet; + +public interface MessageSource extends TransactionLogResource +{ + Consumer addConsumer(ConsumerTarget target, FilterManager filters, + Class<? extends ServerMessage> messageClass, + String consumerName, EnumSet<Consumer.Option> options) throws AMQException; + + Collection<Consumer> getConsumers(); + + void addConsumerRegistrationListener(ConsumerRegistrationListener listener); + + void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); + + AuthorizationHolder getAuthorizationHolder(); + + void setAuthorizationHolder(AuthorizationHolder principalHolder); + + void setExclusiveOwningSession(AMQSessionModel owner); + + AMQSessionModel getExclusiveOwningSession(); + + boolean isExclusive(); + + void enqueue(ServerMessage message) throws AMQException; + + interface ConsumerRegistrationListener + { + void consumerAdded(AMQQueue queue, Consumer consumer); + void consumerRemoved(AMQQueue queue, Consumer consumer); + } + + /** + * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer + * already exists. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Move to top level, used outside this class. + */ + static final class ExistingExclusiveConsumer extends AMQException + { + + public ExistingExclusiveConsumer() + { + super(""); + } + } + + /** + * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer + * already exists. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Move to top level, used outside this class. + */ + static final class ExistingConsumerPreventsExclusive extends AMQException + { + public ExistingConsumerPreventsExclusive() + { + super(""); + } + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java new file mode 100644 index 0000000000..0ba3095243 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface CapacityChecker +{ + void checkCapacity(AMQSessionModel channel); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 018ba454e4..4fe6117d88 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -25,25 +25,19 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; -import java.util.EnumSet; import java.util.List; import java.util.Set; -public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue +public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker { - String getName(); public interface NotificationListener { @@ -75,29 +69,9 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa boolean isAutoDelete(); String getOwner(); - AuthorizationHolder getAuthorizationHolder(); - void setAuthorizationHolder(AuthorizationHolder principalHolder); - - void setExclusiveOwningSession(AMQSessionModel owner); - AMQSessionModel getExclusiveOwningSession(); VirtualHost getVirtualHost(); - Consumer addConsumer(final ConsumerTarget target, final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final String consumerName, EnumSet<Consumer.Option> options) throws AMQException; - - Collection<Consumer> getConsumers(); - - interface ConsumerRegistrationListener - { - void consumerAdded(AMQQueue queue, Consumer consumer); - void consumerRemoved(AMQQueue queue, Consumer consumer); - } - - void addConsumerRegistrationListener(ConsumerRegistrationListener listener); - void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); - int getConsumerCount(); @@ -215,8 +189,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa void stop(); - boolean isExclusive(); - Exchange getAlternateExchange(); void setAlternateExchange(Exchange exchange); @@ -224,51 +196,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa Collection<String> getAvailableAttributes(); Object getAttribute(String attrName); - void checkCapacity(AMQSessionModel channel); - - /** - * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingExclusiveConsumer extends AMQException - { - - public ExistingExclusiveConsumer() - { - super(""); - } - } - - /** - * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - static final class ExistingConsumerPreventsExclusive extends AMQException - { - public ExistingConsumerPreventsExclusive() - { - super(""); - } - } - void configure(QueueConfiguration config); void setExclusive(boolean exclusive); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index bce2bd67cc..972488da4b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; @@ -29,7 +30,7 @@ import org.apache.qpid.server.util.Action; public interface BaseQueue extends TransactionLogResource { void enqueue(ServerMessage message) throws AMQException; - void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException; + void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index d5c03abc93..22cb6aeb7b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,19 +241,19 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return groupVal; } - private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State> + private class GroupStateChangeListener implements StateChangeListener<MessageInstance, QueueEntry.State> { private final Group _group; public GroupStateChangeListener(final Group group, - final QueueEntry entry) + final MessageInstance entry) { _group = group; } - public void stateChanged(final QueueEntry entry, - final QueueEntry.State oldState, - final QueueEntry.State newState) + public void stateChanged(final MessageInstance entry, + final MessageInstance.State oldState, + final MessageInstance.State newState) { synchronized (DefinedGroupMessageGroupManager.this) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 7908cc9de7..93bb3a8c61 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; @@ -62,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners; + private volatile Set<StateChangeListener<MessageInstance, State>> _stateChangeListeners; private static final AtomicReferenceFieldUpdater<QueueEntryImpl, Set> @@ -332,7 +333,7 @@ public abstract class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners) + for(StateChangeListener<MessageInstance, State> l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } @@ -363,7 +364,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn) + public int routeToAlternate(final Action<MessageInstance> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); @@ -408,21 +409,21 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener) + public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener) { - Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners; + Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners; if(listeners == null) { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>()); + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance, State>>()); listeners = _stateChangeListeners; } listeners.add(listener); } - public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener) + public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener) { - Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners; + Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners; if(listeners != null) { return listeners.remove(listener); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 78585997be..7435c690b3 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -635,7 +636,7 @@ public class SimpleAMQQueue implements AMQQueue, enqueue(message, null); } - public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException { incrementQueueCount(); incrementQueueSize(message); @@ -1967,7 +1968,7 @@ public class SimpleAMQQueue implements AMQQueue, return _notificationChecks; } - private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State> + private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State> { private final QueueConsumer _sub; @@ -1988,7 +1989,7 @@ public class SimpleAMQQueue implements AMQQueue, return System.identityHashCode(_sub); } - public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) + public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); deliverAsync(_sub); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 44bda5182a..3185abc6cd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -49,7 +50,7 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 03717ed6ae..57c67f54cd 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { Transaction txn = null; try diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index b9d91a647b..4ea48c6a24 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -154,7 +154,7 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { Transaction txn = null; try diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 238facf4b5..4a7c16a7cd 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -105,7 +105,7 @@ public class DistributedTransaction implements ServerTransaction } } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { if(_branch != null) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index ada4eeb553..2505548ab8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -363,7 +363,7 @@ public class DtxBranch } - public void enqueue(BaseQueue queue, EnqueueableMessage message) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message) { _enqueueRecords.add(new Record(queue, message)); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 93482153d3..4b02d4f8ec 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -197,7 +197,7 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index 3355a7ed06..cae5fa73bf 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -94,7 +94,7 @@ public interface ServerTransaction * * A store operation will result only for a persistent message on a durable queue. */ - void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction); + void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction); /** * Enqueue a message(s) to queue(s) registering a post transaction action. diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 5859ce3c68..99b7407bde 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -47,6 +47,8 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -441,6 +443,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } @Override + public MessageSource getMessageSource(final String name) + { + return getQueue(name); + } + + @Override public AMQQueue getQueue(UUID id) { return _queueRegistry.getQueue(id); @@ -524,6 +532,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return getExchange(name); + } + @Override public Exchange getExchange(String name) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 2ebbedccd4..0c1b949e62 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -30,6 +30,8 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; @@ -49,6 +51,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable String getName(); AMQQueue getQueue(String name); + MessageSource getMessageSource(String name); AMQQueue getQueue(UUID id); @@ -76,6 +79,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable void removeExchange(Exchange exchange, boolean force) throws AMQException; + MessageDestination getMessageDestination(String name); Exchange getExchange(String name); Exchange getExchange(UUID id); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 36fd92004a..1158781beb 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; @@ -305,7 +306,7 @@ public class MockAMQQueue implements AMQQueue { } - public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException + public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index b3a124a6bd..95a6030d6a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.Consumer; @@ -62,7 +63,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener) + public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener) { } @@ -72,7 +73,7 @@ public class MockQueueEntry implements QueueEntry } - public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn) + public int routeToAlternate(final Action<MessageInstance> action, final ServerTransaction txn) { return 0; } @@ -152,7 +153,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener) + public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener) { return false; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 542f6ba0d1..62120f26d3 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -245,11 +245,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() + Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry) entry); } }; @@ -298,11 +298,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() + Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry) entry); } }; @@ -356,11 +356,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() + Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry) entry); } }; @@ -420,11 +420,11 @@ public class SimpleAMQQueueTest extends QpidTestCase final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() + Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>() { - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - queueEntries.add(entry); + queueEntries.add((QueueEntry)entry); } }; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 1ca7ff1b65..832b89c81a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -27,6 +27,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; @@ -127,6 +129,12 @@ public class MockVirtualHost implements VirtualHost } @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override public AMQQueue getQueue(UUID id) { return null; @@ -174,6 +182,12 @@ public class MockVirtualHost implements VirtualHost } @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override public Exchange getExchange(String name) { return null; diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 2093490ee2..c478956629 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -385,13 +385,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); - int requeues = entry.routeToAlternate(new Action<QueueEntry>() + int requeues = entry.routeToAlternate(new Action<MessageInstance>() { @Override - public void performAction(final QueueEntry requeueEntry) + public void performAction(final MessageInstance requeueEntry) { logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); + requeueEntry.getOwningResource().getName())); } }, null); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 67b3740bb6..b5c4724292 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,7 +46,6 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -55,14 +54,16 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -104,12 +105,16 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>() + private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>() { @Override - public void performAction(final QueueEntry entry) + public void performAction(final MessageInstance entry) { - entry.getQueue().checkCapacity(ServerSession.this); + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(ServerSession.this); + } } }; @@ -188,7 +193,7 @@ public class ServerSession extends Session public int enqueue(final MessageTransferMessage message, final InstanceProperties instanceProperties, - final Exchange exchange) + final MessageDestination exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d1b24a2fdb..9a90b74656 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -35,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -196,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate String queueName = method.getQueue(); VirtualHost vhost = getVirtualHost(session); - final AMQQueue queue = vhost.getQueue(queueName); + final MessageSource queue = vhost.getMessageSource(queueName); if(queue == null) { @@ -308,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - final Exchange exchange = getExchangeForMessage(ssn, xfr); + final MessageDestination exchange = getDestinationForMessage(ssn, xfr); final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -327,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate return; } - final Exchange exchangeInUse; final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); final ServerSession serverSession = (ServerSession) ssn; @@ -829,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate return getVirtualHost(session).getExchange(exchangeName); } - private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) + private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr) { VirtualHost virtualHost = getVirtualHost(ssn); - Exchange exchange; + MessageDestination destination; if(xfr.hasDestination()) { - exchange = virtualHost.getExchange(xfr.getDestination()); - if(exchange == null) + destination = virtualHost.getMessageDestination(xfr.getDestination()); + if(destination == null) { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } } else { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } - return exchange; + return destination; } private VirtualHost getVirtualHost(Session session) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d2b1f83513..dc9a6484fa 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -56,9 +56,12 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -256,7 +259,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _channelId; } - public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException + public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException { String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); SecurityManager securityManager = getVirtualHost().getSecurityManager(); @@ -265,7 +268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F throw new AMQSecurityException("Permission denied: " + e.getName()); } _currentMessage = new IncomingMessage(info); - _currentMessage.setExchange(e); + _currentMessage.setMessageDestination(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) @@ -350,7 +353,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction, immediate ? _immediateAction : _capacityCheckAction); if(enqueues == 0) { @@ -497,19 +500,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * + * * @param tag the tag chosen by the client (if null, server will generate one) - * @param queue the queue to subscribe to + * @param source the queue to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * - * @param noLocal Flag stopping own messages being received. * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * * @throws AMQException if something goes wrong */ - public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException + public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, + FieldTable filters, boolean exclusive) throws AMQException { if (tag == null) { @@ -557,7 +560,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { Consumer sub = - queue.addConsumer(target, + source.addConsumer(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), @@ -1189,16 +1192,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction implements Action<QueueEntry> + private class ImmediateAction implements Action<MessageInstance> { public ImmediateAction() { } - public void performAction(QueueEntry entry) + public void performAction(MessageInstance entry) { - AMQQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if (!entry.getDeliveredToConsumer() && entry.acquire()) { @@ -1246,19 +1249,25 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - queue.checkCapacity(AMQChannel.this); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } } - private final class CapacityCheckAction implements Action<QueueEntry> + private final class CapacityCheckAction implements Action<MessageInstance> { @Override - public void performAction(final QueueEntry entry) + public void performAction(final MessageInstance entry) { - AMQQueue queue = entry.getQueue(); - queue.checkCapacity(AMQChannel.this); + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } @@ -1477,13 +1486,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F final ServerMessage msg = rejectedQueueEntry.getMessage(); final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); - int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>() + int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>() { @Override - public void performAction(final QueueEntry requeueEntry) + public void performAction(final MessageInstance requeueEntry) { _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); + requeueEntry.getOwningResource().getName())); } }, null); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 536bc18964..47700f812f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -49,13 +49,13 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { - private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener = - new StateChangeListener<QueueEntry, QueueEntry.State>() + private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener = + new StateChangeListener<MessageInstance, MessageInstance.State>() { @Override - public void stateChanged(final QueueEntry entry, - final QueueEntry.State oldSate, - final QueueEntry.State newState) + public void stateChanged(final MessageInstance entry, + final MessageInstance.State oldSate, + final MessageInstance.State newState) { if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) { @@ -463,7 +463,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen _creditManager.restoreCredit(1, message.getSize()); } - protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener() + protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener() { return _entryReleaseListener; } @@ -526,11 +526,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen final long size = entry.getMessage().getSize(); _unacknowledgedBytes.addAndGet(size); _unacknowledgedCount.incrementAndGet(); - entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>() + entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>() { - public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState) + public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState) { - if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED)) + if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED)) { _unacknowledgedBytes.addAndGet(-size); _unacknowledgedCount.decrementAndGet(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index 5a9a51ff59..80c4c77b65 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -20,15 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.message.MessageDestination; import java.util.ArrayList; import java.util.List; @@ -38,7 +35,7 @@ public class IncomingMessage private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - private Exchange _exchange; + private MessageDestination _messageDestination; /** * Keeps a track of how many bytes we have received in body frames @@ -77,9 +74,9 @@ public class IncomingMessage return _messagePublishInfo.getExchange(); } - public Exchange getExchange() + public MessageDestination getDestination() { - return _exchange; + return _messageDestination; } public ContentHeaderBody getContentHeader() @@ -92,9 +89,9 @@ public class IncomingMessage return getContentHeader().getBodySize(); } - public void setExchange(final Exchange e) + public void setMessageDestination(final MessageDestination e) { - _exchange = e; + _messageDestination = e; } public int getBodyCount() throws AMQException diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index c2d02c1df8..526bc9b9fe 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic " args:" + body.getArguments()); } - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); + MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); if (queue == null) { @@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) { - AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(), - body.getArguments(), body.getNoLocal(), body.getExclusive()); + AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, + queue, + !body.getNoAck(), + body.getArguments(), + body.getExclusive()); if (!body.getNowait()) { MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index 497e97db3e..f8a7722447 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -67,7 +68,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic } VirtualHost vHost = session.getVirtualHost(); - Exchange exch = vHost.getExchange(exchangeName.toString()); + MessageDestination exch = vHost.getMessageDestination(exchangeName.toString()); // if the exchange does not exist we raise a channel exception if (exch == null) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index bb5fecdfb4..281f7345ff 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); //Subscribe to the queue - AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); + AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true); getQueue().deliverAsync(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 247bc53cd1..479c715b2a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -144,6 +144,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.subscribeToQueue(null, queue, true, filters, false, true); + return channel.consumeFromSource(null, queue, true, filters, true); } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java new file mode 100644 index 0000000000..70f659b546 --- /dev/null +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Rejected; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.txn.ServerTransaction; + +public class NodeReceivingDestination implements ReceivingDestination +{ + private static final Accepted ACCEPTED = new Accepted(); + public static final Rejected REJECTED = new Rejected(); + private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; + + private MessageDestination _exchange; + private TerminusDurability _durability; + private TerminusExpiryPolicy _expiryPolicy; + + public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) + { + _exchange = exchange; + _durability = durable; + _expiryPolicy = expiryPolicy; + } + + public Outcome[] getOutcomes() + { + return OUTCOMES; + } + + public Outcome send(final Message_1_0 message, ServerTransaction txn) + { + final InstanceProperties instanceProperties = + new InstanceProperties() + { + + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case MANDATORY: + return false; + case REDELIVERED: + return false; + case PERSISTENT: + return message.isPersistent(); + case IMMEDIATE: + return false; + case EXPIRATION: + return message.getExpiration(); + } + return null; + }}; + + int enqueues = _exchange.send(message, instanceProperties, txn, null); + + + return enqueues == 0 ? REJECTED : ACCEPTED; + } + + TerminusDurability getDurability() + { + return _durability; + } + + TerminusExpiryPolicy getExpiryPolicy() + { + return _expiryPolicy; + } + + public int getCredit() + { + // TODO - fix + return 20000; + } + + public MessageDestination getDestination() + { + return _exchange; + } +} diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index b9c10b925f..c2d124b427 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.ServerTransaction; @@ -35,9 +36,9 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; - private AMQQueue _queue; + private MessageSource _queue; - public QueueDestination(AMQQueue queue) + public QueueDestination(MessageSource queue) { _queue = queue; } @@ -60,7 +61,6 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio { try { - _queue.enqueue(message); } catch (Exception e) @@ -91,7 +91,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio return 100; } - public AMQQueue getQueue() + public MessageSource getQueue() { return _queue; } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 2fff1856c7..f7867d6178 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -66,6 +66,7 @@ import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.consumer.Consumer; @@ -95,7 +96,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>(); private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>(); private Runnable _closeAction; - private final AMQQueue _queue; + private final MessageSource _queue; public SendingLink_1_0(final SendingLinkAttachment linkAttachment, @@ -121,7 +122,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _queue = ((QueueDestination) _destination).getQueue(); - if(_queue.getAvailableAttributes().contains("topic")) + if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } @@ -217,7 +218,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - _queue = _vhost.createQueue( + queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(name, _vhost.getName()), name, isDurable, @@ -229,8 +230,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } else { - _queue = queue; - List<Binding> bindings = _queue.getBindings(); + List<Binding> bindings = queue.getBindings(); List<Binding> bindingsToRemove = new ArrayList<Binding>(); for(Binding existingBinding : bindings) { @@ -313,15 +313,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } } } + _queue = queue; source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - exchange.addBinding(binding, _queue,null); + exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); if(!isDurable) { final String queueName = name; - final AMQQueue tempQueue = _queue; + final AMQQueue tempQueue = queue; final Action<Connection_1_0> deleteQueueTask = new Action<Connection_1_0>() @@ -345,7 +346,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - _queue.addQueueDeleteTask(new Action<AMQQueue>() + queue.addQueueDeleteTask(new Action<AMQQueue>() { public void performAction(AMQQueue queue) { @@ -356,7 +357,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS }); } - qd = new QueueDestination(_queue); + qd = new QueueDestination(queue); } catch (AMQSecurityException e) { @@ -454,7 +455,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - _queue.getVirtualHost().removeQueue(_queue); + _vhost.removeQueue((AMQQueue)_queue); } catch(AMQException e) { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 51ff9c13cb..c7508fa913 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -109,7 +111,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueue(addr); + MessageSource queue = _vhost.getMessageSource(addr); if(queue != null) { @@ -250,11 +252,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } String addr = target.getAddress(); - Exchange exchg = _vhost.getExchange(addr); - if(exchg != null) + MessageDestination messageDestination = _vhost.getMessageDestination(addr); + if(messageDestination != null) { - destination = new ExchangeDestination(exchg, target.getDurable(), - target.getExpiryPolicy()); + destination = new NodeReceivingDestination(messageDestination, target.getDurable(), + target.getExpiryPolicy()); } else { |