diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-03 02:14:59 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-03 02:14:59 +0000 |
commit | fc08306ab1474c3bf4b30942d921a8266e4bd724 (patch) | |
tree | bc7ac40adfc2c8ebe1b486fc331b6e481cab1d2d | |
parent | a9b950ac164bb7e2dd05ae44f99d4b728697ad65 (diff) | |
download | qpid-python-fc08306ab1474c3bf4b30942d921a8266e4bd724.tar.gz |
Updates to subscription
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1563758 13f79535-47bb-0310-9956-ffa450edef68
47 files changed, 2181 insertions, 2458 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 6a959df440..ff27c8159e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -39,8 +39,10 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -66,7 +68,7 @@ public abstract class AbstractExchange implements Exchange private VirtualHost _virtualHost; - private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>(); /** * Whether the exchange is automatically deleted once all queues have detached from it @@ -138,6 +140,12 @@ public abstract class AbstractExchange implements Exchange if(_closed.compareAndSet(false,true)) { + List<Binding> bindings = new ArrayList<Binding>(_bindings); + for(Binding binding : bindings) + { + removeBinding(binding); + } + if(_alternateExchange != null) { _alternateExchange.removeReference(this); @@ -145,9 +153,9 @@ public abstract class AbstractExchange implements Exchange CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED()); - for(Task task : _closeTaskList) + for(Action<Exchange> task : _closeTaskList) { - task.onClose(this); + task.performAction(this); } _closeTaskList.clear(); } @@ -300,12 +308,12 @@ public abstract class AbstractExchange implements Exchange return !_referrers.isEmpty(); } - public void addCloseTask(final Task task) + public void addCloseTask(final Action<Exchange> task) { _closeTaskList.add(task); } - public void removeCloseTask(final Task task) + public void removeCloseTask(final Action<Exchange> task) { _closeTaskList.remove(task); } @@ -421,7 +429,7 @@ public abstract class AbstractExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final BaseQueue.PostEnqueueAction postEnqueueAction) + final Action<QueueEntry> postEnqueueAction) { List<? extends BaseQueue> queues = route(message, instanceProperties); @@ -579,8 +587,6 @@ public abstract class AbstractExchange implements Exchange { doRemoveBinding(b); queue.removeBinding(b); - removeCloseTask(b); - queue.removeQueueDeleteTask(b); if (b.isDurable()) { @@ -659,8 +665,6 @@ public abstract class AbstractExchange implements Exchange DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); } - queue.addQueueDeleteTask(b); - addCloseTask(b); queue.addBinding(b); doAddBinding(b); b.logCreation(); @@ -673,7 +677,7 @@ public abstract class AbstractExchange implements Exchange } } - private final class BindingImpl extends Binding implements AMQQueue.Task, Task + private final class BindingImpl extends Binding { private final BindingLogSubject _logSubject; //TODO : persist creation time @@ -689,12 +693,6 @@ public abstract class AbstractExchange implements Exchange } - - public void doTask(final AMQQueue queue) throws AMQException - { - removeBinding(this); - } - public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException { removeBinding(this); @@ -729,11 +727,4 @@ public abstract class AbstractExchange implements Exchange } - public static interface Task - { - public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; - } - - - } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 71d0f8b4dd..db3464c463 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -42,8 +42,10 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange @@ -334,7 +336,7 @@ public class DefaultExchange implements Exchange public final int send(final ServerMessage message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final BaseQueue.PostEnqueueAction postEnqueueAction) + final Action<QueueEntry> postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 18e912e972..af1eed9032 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -29,7 +29,9 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -105,7 +107,7 @@ public interface Exchange extends ExchangeReferrer int send(ServerMessage message, InstanceProperties instanceProperties, ServerTransaction txn, - BaseQueue.PostEnqueueAction postEnqueueAction); + Action<QueueEntry> postEnqueueAction); /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index 696c59783e..850a6c9d80 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -47,7 +47,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer queueAdapter.getName(), subscription.getSessionModel().getConnectionModel().getRemoteAddressString(), String.valueOf(subscription.getSessionModel().getChannelId()), - subscription.getConsumerName()), queueAdapter.getTaskExecutor()); + subscription.getName()), queueAdapter.getTaskExecutor()); _subscription = subscription; _queue = queueAdapter; _session = sessionAdapter; @@ -57,7 +57,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer public String getName() { - return _subscription.getConsumerName(); + return _subscription.getName(); } public String setName(final String currentName, final String desiredName) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0cddd1ed3b..f6602c8071 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -132,8 +133,8 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - void addQueueDeleteTask(final Task task); - void removeQueueDeleteTask(final Task task); + void addQueueDeleteTask(Action<AMQQueue> task); + void removeQueueDeleteTask(Action<AMQQueue> task); @@ -271,11 +272,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa } } - static interface Task - { - public void doTask(AMQQueue queue) throws AMQException; - } - void configure(QueueConfiguration config); void setExclusive(boolean exclusive); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 7aba1a2342..bce2bd67cc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -24,17 +24,12 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.util.Action; public interface BaseQueue extends TransactionLogResource { - public static interface PostEnqueueAction - { - public void onEnqueue(QueueEntry entry); - } - void enqueue(ServerMessage message) throws AMQException; - void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException; - void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException; + void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2aa1d1f473..212f2f65fa 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -25,6 +25,8 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> { @@ -42,11 +44,6 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> } - public static interface StateChangeListener - { - public void stateChanged(QueueEntry entry, State oldSate, State newState); - } - public abstract class EntryState { private EntryState() @@ -198,7 +195,7 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> boolean isRejectedBy(long subscriptionId); - int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn); + int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn); boolean isQueueDeleted(); @@ -206,8 +203,8 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> QueueEntry getNextValidEntry(); - void addStateChangeListener(StateChangeListener listener); - boolean removeStateChangeListener(StateChangeListener listener); + void addStateChangeListener(StateChangeListener<QueueEntry, State> listener); + boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener); /** diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 461d493437..f3a9a9dcc7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -31,6 +31,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import java.util.EnumMap; import java.util.HashSet; @@ -59,7 +61,7 @@ public abstract class QueueEntryImpl implements QueueEntry (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile Set<StateChangeListener> _stateChangeListeners; + private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners; private static final AtomicReferenceFieldUpdater<QueueEntryImpl, Set> @@ -231,11 +233,6 @@ public abstract class QueueEntryImpl implements QueueEntry if(state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(this); - Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription(); - if (subscription != null) - { - subscription.releaseQueueEntry(this); - } } if(!getQueue().isDeleted()) @@ -320,8 +317,6 @@ public abstract class QueueEntryImpl implements QueueEntry if (state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(this); - s = ((SubscriptionAcquiredState) state).getSubscription(); - s.onDequeue(this); } getQueue().dequeue(this,s); @@ -336,7 +331,7 @@ public abstract class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener l : _stateChangeListeners) + for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } @@ -367,7 +362,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn) + public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); @@ -412,21 +407,21 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener) { - Set<StateChangeListener> listeners = _stateChangeListeners; + Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners; if(listeners == null) { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>()); listeners = _stateChangeListeners; } listeners.add(listener); } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener) { - Set<StateChangeListener> listeners = _stateChangeListeners; + Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners; if(listeners != null) { return listeners.remove(listener); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 87d11a892e..5f79498beb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -61,9 +61,13 @@ import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper +public class SimpleAMQQueue implements AMQQueue, + StateChangeListener<Subscription, Subscription.State>, + MessageGroupManager.SubscriptionResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -121,10 +125,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); private final AtomicInteger _consumerCountHigh = new AtomicInteger(0); - private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); - private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); - private final AtomicLong _msgTxnDequeues = new AtomicLong(0); - private final AtomicLong _byteTxnDequeues = new AtomicLong(0); private final AtomicLong _unackedMsgCount = new AtomicLong(0); private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); private final AtomicLong _unackedMsgBytes = new AtomicLong(); @@ -165,7 +165,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>(); private LogSubject _logSubject; @@ -451,7 +451,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (isDeleted()) { - subscription.queueDeleted(this); + subscription.queueDeleted(); } } else @@ -505,7 +505,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); + subscription.queueDeleted(); } } @@ -622,18 +622,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes enqueue(message, null); } - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException { - enqueue(message, false, action); - } - - public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException - { - - if(transactional) - { - incrementTxnEnqueueStats(message); - } incrementQueueCount(); incrementQueueSize(message); @@ -715,7 +705,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(action != null) { - action.onEnqueue(entry); + action.performAction(entry); } } @@ -810,18 +800,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getAtomicQueueCount().incrementAndGet(); } - private void incrementTxnEnqueueStats(final ServerMessage message) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } - - private void incrementTxnDequeueStats(QueueEntry entry) - { - _msgTxnDequeues.incrementAndGet(); - _byteTxnDequeues.addAndGet(entry.getSize()); - } - private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch) throws AMQException { @@ -900,11 +878,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _deliveredMessages.decrementAndGet(); } - if(sub != null && sub.isSessionTransactional()) - { - incrementTxnDequeueStats(entry); - } - checkCapacity(); } @@ -1039,7 +1012,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) + public void stateChanged(Subscription sub, Subscription.State oldState, Subscription.State newState) { if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) { @@ -1300,12 +1273,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); } - public void addQueueDeleteTask(final Task task) + public void addQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Task task) + public void removeQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.remove(task); } @@ -1322,7 +1295,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (!_deleted.getAndSet(true)) { - for (Binding b : _bindings) + final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings); + + for (Binding b : bindingCopy) { b.getExchange().removeBinding(b); } @@ -1334,7 +1309,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes Subscription s = subscriptionIter.getNode().getSubscription(); if (s != null) { - s.queueDeleted(this); + s.queueDeleted(); } } @@ -1375,9 +1350,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - for (Task task : _deleteTaskList) + for (Action<AMQQueue> task : _deleteTaskList) { - task.doTask(this); + task.performAction(this); } _deleteTaskList.clear(); @@ -1984,7 +1959,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _notificationChecks; } - private final class QueueEntryListener implements QueueEntry.StateChangeListener + private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State> { private final Subscription _sub; @@ -2076,26 +2051,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _dequeueSize.get(); } - public long getByteTxnEnqueues() - { - return _byteTxnEnqueues.get(); - } - - public long getByteTxnDequeues() - { - return _byteTxnDequeues.get(); - } - - public long getMsgTxnEnqueues() - { - return _msgTxnEnqueues.get(); - } - - public long getMsgTxnDequeues() - { - return _msgTxnDequeues.get(); - } - public long getPersistentByteEnqueues() { return _persistentMessageEnqueueSize.get(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index b3566df0c4..6c63b30273 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -48,7 +49,7 @@ public class SortedQueue extends OutOfOrderQueue return _sortedPropertyName; } - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java new file mode 100644 index 0000000000..213d8b7730 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.SubscriptionActor; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.util.StateChangeListener; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public abstract class AbstractSubscription implements Subscription +{ + private final long _subscriptionID; + private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); + private final Lock _stateChangeLock = new ReentrantLock(); + private final long _createTime = System.currentTimeMillis(); + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final boolean _acquires; + private final boolean _seesRequeues; + private final String _consumerName; + private final boolean _isTransient; + + + private final AtomicLong _deliveredCount = new AtomicLong(0); + private final AtomicLong _deliveredBytes = new AtomicLong(0); + + private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); + + + private final FilterManager _filters; + + private volatile AMQQueue.Context _queueContext; + + + private StateChangeListener<Subscription, State> _stateListener = new StateChangeListener<Subscription, State>() + { + public void stateChanged(Subscription sub, State oldState, State newState) + { + CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); + } + }; + + private SubscriptionLogSubject _logSubject; + private AMQQueue _queue; + private String _traceExclude; + private String _trace; + private SubscriptionActor _logActor; + private final Class<? extends ServerMessage> _messageClass; + private final Object _sessionReference; + private boolean _noLocal; + + protected AbstractSubscription(FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final Object sessionReference, + final boolean acquires, + final boolean seesRequeues, + final String consumerName, final boolean isTransient) + { + _messageClass = messageClass; + _sessionReference = sessionReference; + _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); + _filters = filters; + _acquires = acquires; + _seesRequeues = seesRequeues; + _consumerName = consumerName; + _isTransient = isTransient; + } + + public final long getSubscriptionID() + { + return _subscriptionID; + } + + + public final StateChangeListener<Subscription, State> getStateListener() + { + return _stateListener; + } + + public final void setStateListener(StateChangeListener<Subscription, State> listener) + { + _stateListener = listener; + } + + + public final AMQQueue.Context getQueueContext() + { + return _queueContext; + } + + public final void setQueueContext(AMQQueue.Context queueContext) + { + _queueContext = queueContext; + } + + + public State getState() + { + return _state.get(); + } + + protected boolean updateState(State from, State to) + { + return _state.compareAndSet(from, to); + } + + public final boolean isActive() + { + return getState() == State.ACTIVE; + } + + public final boolean isClosed() + { + return getState() == State.CLOSED; + } + + + public final void setNoLocal(boolean noLocal) + { + _noLocal = noLocal; + } + + + public final boolean hasInterest(QueueEntry entry) + { + //check that the message hasn't been rejected + if (entry.isRejectedBy(getSubscriptionID())) + { + + return false; + } + + if (entry.getMessage().getClass() == _messageClass) + { + if(_noLocal) + { + Object connectionRef = entry.getMessage().getConnectionReference(); + if (connectionRef != null && connectionRef == _sessionReference) + { + return false; + } + } + } + else + { + // no interest in messages we can't convert + if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null) + { + return false; + } + } + return (_filters == null) || _filters.allAllow(entry.asFilterable()); + } + + + protected String getFilterLogString() + { + StringBuilder filterLogString = new StringBuilder(); + String delimiter = ", "; + boolean hasEntries = false; + if (_filters != null && _filters.hasFilters()) + { + filterLogString.append(_filters.toString()); + hasEntries = true; + } + + if (!acquires()) + { + if (hasEntries) + { + filterLogString.append(delimiter); + } + filterLogString.append("Browser"); + hasEntries = true; + } + + return filterLogString.toString(); + } + + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public final void getSendLock() + { + _stateChangeLock.lock(); + } + + public final void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + public final AMQQueue getQueue() + { + return _queue; + } + + public final void setQueue(AMQQueue queue, boolean exclusive) + { + if(getQueue() != null) + { + throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); + } + _queue = queue; + + _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); + _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); + + _logSubject = new SubscriptionLogSubject(this); + _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); + + if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY)) + { + final String filterLogString = getFilterLogString(); + CurrentActor.get().message(_logSubject, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, + filterLogString.length() > 0)); + } + } + + protected final String getTraceExclude() + { + return _traceExclude; + } + + protected final String getTrace() + { + return _trace; + } + + protected final LogSubject getLogSubject() + { + return _logSubject; + } + + public final LogActor getLogActor() + { + return _logActor; + } + + public final long getCreateTime() + { + return _createTime; + } + + + public final QueueEntry.SubscriptionAcquiredState getOwningState() + { + return _owningState; + } + + + public final void set(String key, Object value) + { + _properties.put(key, value); + } + + public final Object get(String key) + { + return _properties.get(key); + } + + public final boolean acquires() + { + return _acquires; + } + + public final boolean seesRequeues() + { + return _seesRequeues; + } + + public final String getName() + { + return _consumerName; + } + + public final boolean isTransient() + { + return _isTransient; + } + + + public final long getBytesOut() + { + return _deliveredBytes.longValue(); + } + + public final long getMessagesOut() + { + return _deliveredCount.longValue(); + } + + public final void send(final QueueEntry entry, final boolean batch) throws AMQException + { + _deliveredCount.incrementAndGet(); + _deliveredBytes.addAndGet(entry.getMessage().getSize()); + doSend(entry, batch); + } + + protected abstract void doSend(final QueueEntry entry, final boolean batch) throws AMQException; + + @Override + public final void flush() throws AMQException + { + getQueue().flushSubscription(this); + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java new file mode 100644 index 0000000000..4d9550907e --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.util.StateChangeListener; + +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractSubscriptionTarget implements SubscriptionTarget +{ + + private final AtomicReference<State> _state; + private final AtomicReference<StateChangeListener<SubscriptionTarget, State>> _stateListener = + new AtomicReference<StateChangeListener<SubscriptionTarget, State>>(); + + protected AbstractSubscriptionTarget(final State initialState) + { + _state = new AtomicReference<State>(initialState); + } + + + public final State getState() + { + return _state.get(); + } + + protected final boolean updateState(State from, State to) + { + if(_state.compareAndSet(from, to)) + { + StateChangeListener<SubscriptionTarget, State> listener = _stateListener.get(); + if(listener != null) + { + listener.stateChanged(this, from, to); + } + return true; + } + else + { + return false; + } + } + + + public final void setStateListener(StateChangeListener<SubscriptionTarget, State> listener) + { + _stateListener.set(listener); + } + + public final StateChangeListener<SubscriptionTarget, State> getStateListener() + { + return _stateListener.get(); + } + +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java index 55110c46de..9b5e4fd10a 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,7 +241,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return groupVal; } - private class GroupStateChangeListener implements QueueEntry.StateChangeListener + private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State> { private final Group _group; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java new file mode 100644 index 0000000000..623371c84c --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java @@ -0,0 +1,189 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.util.StateChangeListener; + +import java.util.EnumMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DelegatingSubscription<T extends SubscriptionTarget> extends AbstractSubscription +{ + private static final Logger _logger = Logger.getLogger(DelegatingSubscription.class); + private final AtomicBoolean _closed = new AtomicBoolean(false); + + static final EnumMap<SubscriptionTarget.State, State> STATE_MAP = + new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class); + + static + { + STATE_MAP.put(SubscriptionTarget.State.ACTIVE, State.ACTIVE); + STATE_MAP.put(SubscriptionTarget.State.SUSPENDED, State.SUSPENDED); + STATE_MAP.put(SubscriptionTarget.State.CLOSED, State.CLOSED); + } + + private final T _target; + + public DelegatingSubscription(final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final boolean acquires, + final boolean seesRequeues, + final String consumerName, + final boolean isTransient, + T target) + { + super(filters, messageClass, target.getSessionModel().getConnectionReference(), + acquires, seesRequeues, consumerName, isTransient); + _target = target; + _target.setStateListener( + new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>() + { + @Override + public void stateChanged(final SubscriptionTarget object, + final SubscriptionTarget.State oldState, + final SubscriptionTarget.State newState) + { + targetStateChanged(oldState, newState); + } + }); + } + + private void targetStateChanged(final SubscriptionTarget.State oldState, final SubscriptionTarget.State newState) + { + if(oldState != newState) + { + if(newState == SubscriptionTarget.State.CLOSED) + { + if(_closed.compareAndSet(false,true)) + { + CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); + } + } + else + { + CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString())); + } + } + + if(newState == SubscriptionTarget.State.CLOSED && oldState != newState) + { + try + { + getQueue().unregisterSubscription(this); + } + catch (AMQException e) + { + _logger.error("Unable to remove to remove subscription", e); + throw new RuntimeException(e); + } + } + final StateChangeListener<Subscription, State> stateListener = getStateListener(); + if(stateListener != null) + { + stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState)); + } + } + + public T getTarget() + { + return _target; + } + + @Override + public long getUnacknowledgedBytes() + { + return _target.getUnacknowledgedBytes(); + } + + @Override + public long getUnacknowledgedMessages() + { + return _target.getUnacknowledgedMessages(); + } + + @Override + public AMQSessionModel getSessionModel() + { + return _target.getSessionModel(); + } + + @Override + public boolean isSuspended() + { + return _target.isSuspended(); + } + + @Override + public void close() + { + _target.close(); + } + + @Override + protected void doSend(final QueueEntry entry, final boolean batch) throws AMQException + { + _target.send(entry, batch); + } + + @Override + public void flushBatched() + { + _target.flushBatched(); + } + + @Override + public void queueDeleted() + { + _target.queueDeleted(); + } + + @Override + public boolean wouldSuspend(final QueueEntry msg) + { + return !_target.allocateCredit(msg); + } + + @Override + public void restoreCredit(final QueueEntry queueEntry) + { + _target.restoreCredit(queueEntry); + } + + @Override + public void queueEmpty() throws AMQException + { + _target.queueEmpty(); + } + + @Override + public State getState() + { + return STATE_MAP.get(_target.getState()); + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java index fde3d3809c..f9278cf719 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.util.StateChangeListener; public interface Subscription { @@ -50,11 +51,6 @@ public interface Subscription CLOSED } - public static interface StateListener - { - public void stateChange(Subscription sub, State oldState, State newState); - } - AMQQueue getQueue(); AMQSessionModel getSessionModel(); @@ -82,7 +78,7 @@ public interface Subscription void flushBatched(); - void queueDeleted(AMQQueue queue); + void queueDeleted(); boolean wouldSuspend(QueueEntry msg); @@ -94,13 +90,9 @@ public interface Subscription void releaseSendLock(); - void releaseQueueEntry(final QueueEntry queueEntryImpl); - - void onDequeue(final QueueEntry queueEntry); - void restoreCredit(final QueueEntry queueEntry); - void setStateListener(final StateListener listener); + void setStateListener(final StateChangeListener<Subscription, State> listener); public State getState(); @@ -115,9 +107,9 @@ public interface Subscription public Object get(String key); - boolean isSessionTransactional(); - void queueEmpty() throws AMQException; - String getConsumerName(); + String getName(); + + void flush() throws AMQException; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java new file mode 100644 index 0000000000..0b0be38f42 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.util.StateChangeListener; + +public interface SubscriptionTarget +{ + + + enum State + { + ACTIVE, SUSPENDED, CLOSED + } + + State getState(); + + void setStateListener(StateChangeListener<SubscriptionTarget, State> listener); + + long getUnacknowledgedBytes(); + + long getUnacknowledgedMessages(); + + AMQSessionModel getSessionModel(); + + void send(QueueEntry entry, boolean batch) throws AMQException; + + void flushBatched(); + + void queueDeleted(); + + void queueEmpty() throws AMQException; + + boolean allocateCredit(QueueEntry msg); + + void restoreCredit(QueueEntry queueEntry); + + boolean isSuspended(); + + boolean close(); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java new file mode 100644 index 0000000000..0d53b4d03b --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public interface Action<T> +{ + void performAction(T object); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java new file mode 100644 index 0000000000..b5dc90cfb6 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public interface StateChangeListener<T, E extends Enum> +{ + void stateChanged(T object, E oldState, E newState); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index b7d3cf872b..b01f1d1ebc 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -141,7 +141,7 @@ public class VirtualHostConfigRecoveryHandler implements try { - queue.enqueue(message, true, null); + queue.enqueue(message, null); ref.release(); } catch (AMQException e) diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index ea2e29d40d..98ecdcdd3b 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -297,23 +298,15 @@ public class MockAMQQueue implements AMQQueue { } - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException { } - public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException - { - } - public void requeue(QueueEntry entry) { } - public void requeue(QueueEntryImpl storeContext, Subscription subscription) - { - } - public void dequeue(QueueEntry entry, Subscription sub) { } @@ -323,11 +316,11 @@ public class MockAMQQueue implements AMQQueue return false; } - public void addQueueDeleteTask(Task task) + public void addQueueDeleteTask(Action<AMQQueue> task) { } - public void removeQueueDeleteTask(final Task task) + public void removeQueueDeleteTask(final Action<AMQQueue> task) { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index d3c866f747..de5bbc2347 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -27,6 +27,8 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; public class MockQueueEntry implements QueueEntry { @@ -53,7 +55,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener) { } @@ -63,7 +65,7 @@ public class MockQueueEntry implements QueueEntry } - public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn) + public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn) { return 0; } @@ -137,7 +139,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener) { return false; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index b0e5a510b8..ffd64774c0 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -41,10 +41,10 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -228,9 +228,9 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.registerSubscription(_subscription, false); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() { - public void onEnqueue(QueueEntry entry) + public void performAction(QueueEntry entry) { queueEntries.add(entry); } @@ -276,9 +276,9 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.registerSubscription(_subscription, false); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() { - public void onEnqueue(QueueEntry entry) + public void performAction(QueueEntry entry) { queueEntries.add(entry); } @@ -323,9 +323,9 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.registerSubscription(_subscription, false); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() { - public void onEnqueue(QueueEntry entry) + public void performAction(QueueEntry entry) { queueEntries.add(entry); } @@ -376,9 +376,9 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.registerSubscription(subscription2, false); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() + Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() { - public void onEnqueue(QueueEntry entry) + public void performAction(QueueEntry entry) { queueEntries.add(entry); } @@ -1011,37 +1011,37 @@ public class SimpleAMQQueueTest extends QpidTestCase //verify behaviour in face of expected state changes: //verify a subscription going suspended->active increases the count - queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); + queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); //verify a subscription going active->suspended decreases the count - queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); + queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going suspended->closed doesn't change the count - queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); + queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going active->closed decreases the count - queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); + queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); //verify behaviour in face of unexpected state changes: //verify a subscription going closed->active increases the count - queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); + queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going active->active doesn't change the count - queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); + queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going closed->suspended doesn't change the count - queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); + queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going suspended->suspended doesn't change the count - queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); + queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 8d1b27e272..eec1edca35 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.StateChangeListener; import java.util.ArrayList; import java.util.List; @@ -47,7 +48,7 @@ public class MockSubscription implements Subscription private boolean _closed = false; private String tag = "mocktag"; private AMQQueue queue = null; - private StateListener _listener = null; + private StateChangeListener<Subscription, State> _listener = null; private volatile AMQQueue.Context _queueContext = null; private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); @@ -75,16 +76,22 @@ public class MockSubscription implements Subscription _closed = true; if (_listener != null) { - _listener.stateChange(this, _state, State.CLOSED); + _listener.stateChanged(this, _state, State.CLOSED); } _state = State.CLOSED; } - public String getConsumerName() + public String getName() { return tag; } + @Override + public void flush() throws AMQException + { + + } + public long getSubscriptionID() { return _subscriptionID; @@ -202,7 +209,7 @@ public class MockSubscription implements Subscription return false; } - public void queueDeleted(AMQQueue queue) + public void queueDeleted() { } @@ -211,18 +218,10 @@ public class MockSubscription implements Subscription _stateChangeLock.unlock(); } - public void onDequeue(QueueEntry queueEntry) - { - } - public void restoreCredit(QueueEntry queueEntry) { } - public void releaseQueueEntry(QueueEntry queueEntry) - { - } - public void send(QueueEntry entry, boolean batch) throws AMQException { if (messages.contains(entry)) @@ -251,7 +250,7 @@ public class MockSubscription implements Subscription { } - public void setStateListener(StateListener listener) + public void setStateListener(StateChangeListener<Subscription, State> listener) { this._listener = listener; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4b38b8a1a3..04510064de 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -31,20 +31,19 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final QueueEntry _entry; - private final Subscription_0_10 _sub; + private final SubscriptionTarget_0_10 _target; - public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) + public ExplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target) { _entry = entry; - _sub = subscription_0_10; + _target = target; } public void onAccept() { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getSubscription())) { - subscription.getSessionModel().acknowledge(subscription, _entry); + _target.getSessionModel().acknowledge(_target, _entry); } else { @@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getSubscription())) { - subscription.release(_entry, setRedelivered); + _target.release(_entry, setRedelivered); } else { @@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getSubscription())) { - subscription.reject(_entry); + _target.reject(_entry); } else { @@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - return _entry.acquire(getSubscription()); + return _entry.acquire(_target.getSubscription()); } - private Subscription_0_10 getSubscription() - { - return _sub; - } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index ce0155b789..0cdced728a 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -30,12 +30,12 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final QueueEntry _entry; - private Subscription_0_10 _sub; + private SubscriptionTarget_0_10 _target; - public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) + public ImplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target) { _entry = entry; - _sub = subscription_0_10; + _target = target; } public void onAccept() @@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_target.getSubscription())) { - getSubscription().release(_entry, setRedelivered); + _target.release(_entry, setRedelivered); } else { @@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_target.getSubscription())) { - getSubscription().reject(_entry); + _target.reject(_entry); } else { @@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - boolean acquired = _entry.acquire(getSubscription()); + boolean acquired = _entry.acquire(_target.getSubscription()); if(acquired) { - getSubscription().recordUnacknowledged(_entry); + _target.recordUnacknowledged(_entry); } return acquired; } - public Subscription_0_10 getSubscription() - { - return _sub; - } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index f5f2a8d43f..34a7f2e526 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -26,12 +26,12 @@ import org.apache.qpid.transport.Method; public class MessageAcceptCompletionListener implements Method.CompletionListener { - private final Subscription_0_10 _sub; + private final SubscriptionTarget_0_10 _sub; private final QueueEntry _entry; private final ServerSession _session; private boolean _restoreCredit; - public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) + public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) { super(); _sub = sub; @@ -46,7 +46,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { _sub.restoreCredit(_entry); } - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_sub.getSubscription())) { _session.acknowledge(_sub, _entry); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index a15fea1200..6348510b09 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); - final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subs) + final Collection<SubscriptionTarget_0_10> subs = ssn.getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subs) { subscription_0_10.stop(); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index bae5616042..6a5c69fed0 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -55,15 +55,14 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -77,6 +76,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -104,10 +104,10 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() + private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>() { @Override - public void onEnqueue(final QueueEntry entry) + public void performAction(final QueueEntry entry) { entry.getQueue().checkCapacity(ServerSession.this); } @@ -126,12 +126,6 @@ public class ServerSession extends Session } - public static interface Task - { - public void doTask(ServerSession session); - } - - private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); @@ -142,9 +136,9 @@ public class ServerSession extends Session private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); + private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>(); - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -386,9 +380,9 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Task task : _taskList) + for (Action<ServerSession> task : _taskList) { - task.doTask(this); + task.performAction(this); } LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get(); @@ -405,7 +399,7 @@ public class ServerSession extends Session // Broker shouldn't block awaiting close - thus do override this method to do nothing } - public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + public void acknowledge(final SubscriptionTarget_0_10 sub, final QueueEntry entry) { _transaction.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action() @@ -426,37 +420,28 @@ public class ServerSession extends Session }); } - public Collection<Subscription_0_10> getSubscriptions() + public Collection<SubscriptionTarget_0_10> getSubscriptions() { return _subscriptions.values(); } - public void register(String destination, Subscription_0_10 sub) + public void register(String destination, SubscriptionTarget_0_10 sub) { _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } - public Subscription_0_10 getSubscription(String destination) + public SubscriptionTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); } - public void unregister(Subscription_0_10 sub) + public void unregister(SubscriptionTarget_0_10 sub) { _subscriptions.remove(sub.getName()); try { sub.getSendLock(); - AMQQueue queue = sub.getQueue(); - if(queue != null) - { - queue.unregisterSubscription(sub); - } - } - catch (AMQException e) - { - // TODO - _logger.error("Failed to unregister subscription :" + e.getMessage(), e); + sub.close(); } finally { @@ -638,12 +623,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Task task) + public void addSessionCloseTask(Action<ServerSession> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeSessionCloseTask(Action<ServerSession> task) { _taskList.remove(task); } @@ -829,8 +814,8 @@ public class ServerSession extends Session void unregisterSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } @@ -838,8 +823,8 @@ public class ServerSession extends Session void stopSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.stop(); } @@ -848,58 +833,14 @@ public class ServerSession extends Session public void receivedComplete() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.flushCreditState(false); } awaitCommandCompletion(); } - private class PostEnqueueAction implements ServerTransaction.Action - { - - private final MessageReference<MessageTransferMessage> _reference; - private final List<? extends BaseQueue> _queues; - private final boolean _transactional; - - public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional) - { - _reference = message.newReference(); - _transactional = transactional; - _queues = queues; - } - - public void postCommit() - { - for(int i = 0; i < _queues.size(); i++) - { - try - { - BaseQueue queue = _queues.get(i); - queue.enqueue(_reference.getMessage(), _transactional, null); - if(queue instanceof AMQQueue) - { - ((AMQQueue)queue).checkCapacity(ServerSession.this); - } - - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - _reference.release(); - } - - public void onRollback() - { - // NO-OP - _reference.release(); - } - } - public int getUnacknowledgedMessageCount() { return _messageDispositionListenerMap.size(); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index dcca696529..d3480e3223 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -45,6 +45,8 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.DelegatingSubscription; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -55,6 +57,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; @@ -214,9 +217,9 @@ public class ServerSessionDelegate extends SessionDelegate ServerSession s = (ServerSession) session; queue.setExclusiveOwningSession(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getExclusiveOwningSession() == session) { @@ -228,9 +231,9 @@ public class ServerSessionDelegate extends SessionDelegate if(queue.getAuthorizationHolder() == null) { queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getAuthorizationHolder() == session) { @@ -254,16 +257,21 @@ public class ServerSessionDelegate extends SessionDelegate return; } - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, - destination, - method.getAcceptMode(), - method.getAcquireMode(), - MessageFlowMode.WINDOW, - creditManager, - filterManager, - method.getArguments()); + SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination, + method.getAcceptMode(), + method.getAcquireMode(), + MessageFlowMode.WINDOW, + creditManager, + filterManager, + method.getArguments()); - ((ServerSession)session).register(destination, sub); + Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class, + method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED, + method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target); + + target.setSubscription(sub); + + ((ServerSession)session).register(destination, target); try { queue.registerSubscription(sub, method.getExclusive()); @@ -385,7 +393,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -393,7 +401,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = sub.getQueue(); + AMQQueue queue = sub.getSubscription().getQueue(); ((ServerSession)session).unregister(sub); if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) { @@ -407,7 +415,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1249,9 +1257,9 @@ public class ServerSessionDelegate extends SessionDelegate if (autoDelete && exclusive) { final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() + final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { try { @@ -1265,9 +1273,9 @@ public class ServerSessionDelegate extends SessionDelegate }; final ServerSession s = (ServerSession) session; s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(deleteQueueTask); } @@ -1276,9 +1284,9 @@ public class ServerSessionDelegate extends SessionDelegate if (exclusive) { final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() + final Action<ServerSession> removeExclusive = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { q.setAuthorizationHolder(null); q.setExclusiveOwningSession(null); @@ -1287,9 +1295,9 @@ public class ServerSessionDelegate extends SessionDelegate final ServerSession s = (ServerSession) session; q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(removeExclusive); } @@ -1461,7 +1469,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = sfm.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1478,7 +1486,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = stop.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1496,7 +1504,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = flow.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java new file mode 100644 index 0000000000..c151eddb46 --- /dev/null +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java @@ -0,0 +1,582 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.*; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener +{ + + private static final Option[] BATCHED = new Option[] { Option.BATCH }; + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + private FlowCreditManager_0_10 _creditManager; + + private final MessageAcceptMode _acceptMode; + private final MessageAcquireMode _acquireMode; + private MessageFlowMode _flowMode; + private final ServerSession _session; + private final AtomicBoolean _stopped = new AtomicBoolean(true); + private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; + + private final AtomicLong _unacknowledgedCount = new AtomicLong(0); + private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); + + private final Map<String, Object> _arguments; + private int _deferredMessageCredit; + private long _deferredSizeCredit; + private Subscription _subscription; + + + public SubscriptionTarget_0_10(ServerSession session, + String destination, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + MessageFlowMode flowMode, + FlowCreditManager_0_10 creditManager, + FilterManager filters, + Map<String, Object> arguments) + { + super(State.SUSPENDED); + _session = session; + _postIdSettingAction = new AddMessageDispositionListenerAction(session); + _acceptMode = acceptMode; + _acquireMode = acquireMode; + _creditManager = creditManager; + _flowMode = flowMode; + _creditManager.addStateListener(this); + _arguments = arguments == null ? Collections.<String, Object> emptyMap() : + Collections.<String, Object> unmodifiableMap(arguments); + + } + + + public void setSubscription(Subscription subscription) + { + _subscription = subscription; + } + + public Subscription getSubscription() + { + return _subscription; + } + + public boolean isSuspended() + { + return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension + } + + public boolean close() + { + boolean closed = false; + State state = getState(); + + getSubscription().getSendLock(); + try + { + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + } + _creditManager.removeListener(this); + } + finally + { + getSubscription().releaseSendLock(); + } + + return closed; + + } + + public void creditStateChanged(boolean hasCredit) + { + + if(hasCredit) + { + if(!updateState(State.SUSPENDED, State.ACTIVE)) + { + // this is a hack to get round the issue of increasing bytes credit + getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + } + } + else + { + updateState(State.ACTIVE, State.SUSPENDED); + } + } + + public String getName() + { + return getSubscription().getName(); + } + + public void getSendLock() + { + getSubscription().getSendLock(); + } + + public void releaseSendLock() + { + getSubscription().releaseSendLock(); + } + + + public static class AddMessageDispositionListenerAction implements Runnable + { + private MessageTransfer _xfr; + private ServerSession.MessageDispositionChangeListener _action; + private ServerSession _session; + + public AddMessageDispositionListenerAction(ServerSession session) + { + _session = session; + } + + public void setXfr(MessageTransfer xfr) + { + _xfr = xfr; + } + + public void setAction(ServerSession.MessageDispositionChangeListener action) + { + _action = action; + } + + public void run() + { + if(_action != null) + { + _session.onMessageDispositionChange(_xfr, _action); + } + } + } + + private final AddMessageDispositionListenerAction _postIdSettingAction; + + public void send(final QueueEntry entry, boolean batch) throws AMQException + { + ServerMessage serverMsg = entry.getMessage(); + + + MessageTransfer xfr; + + DeliveryProperties deliveryProps; + MessageProperties messageProps = null; + + MessageTransferMessage msg; + + if(serverMsg instanceof MessageTransferMessage) + { + + msg = (MessageTransferMessage) serverMsg; + + } + else + { + MessageConverter converter = + MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); + + + msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost()); + } + DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); + messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); + + deliveryProps = new DeliveryProperties(); + if(origDeliveryProps != null) + { + if(origDeliveryProps.hasDeliveryMode()) + { + deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); + } + if(origDeliveryProps.hasExchange()) + { + deliveryProps.setExchange(origDeliveryProps.getExchange()); + } + if(origDeliveryProps.hasExpiration()) + { + deliveryProps.setExpiration(origDeliveryProps.getExpiration()); + } + if(origDeliveryProps.hasPriority()) + { + deliveryProps.setPriority(origDeliveryProps.getPriority()); + } + if(origDeliveryProps.hasRoutingKey()) + { + deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); + } + if(origDeliveryProps.hasTimestamp()) + { + deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); + } + if(origDeliveryProps.hasTtl()) + { + deliveryProps.setTtl(origDeliveryProps.getTtl()); + } + + + } + + deliveryProps.setRedelivered(entry.isRedelivered()); + + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + + + xfr = batch ? new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) + : new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + + if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) + { + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + } + else if(_flowMode == MessageFlowMode.WINDOW) + { + xfr.setCompletionListener(new Method.CompletionListener() + { + public void onComplete(Method method) + { + deferredAddCredit(1, entry.getSize()); + } + }); + } + + + _postIdSettingAction.setXfr(xfr); + if(_acceptMode == MessageAcceptMode.EXPLICIT) + { + _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); + } + else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) + { + _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); + } + else + { + _postIdSettingAction.setAction(null); + } + + + _session.sendMessage(xfr, _postIdSettingAction); + entry.incrementDeliveryCount(); + if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + forceDequeue(entry, false); + } + else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + recordUnacknowledged(entry); + } + + } + + void recordUnacknowledged(QueueEntry entry) + { + _unacknowledgedCount.incrementAndGet(); + _unacknowledgedBytes.addAndGet(entry.getSize()); + } + + private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) + { + _deferredMessageCredit += deferredMessageCredit; + _deferredSizeCredit += deferredSizeCredit; + + } + + public void flushCreditState(boolean strict) + { + if(strict || !isSuspended() || _deferredMessageCredit >= 200 + || !(_creditManager instanceof WindowCreditManager) + || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) + { + _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); + _deferredMessageCredit = 0; + _deferredSizeCredit = 0l; + } + } + + private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) + { + AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); + dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), + new ServerTransaction.Action() + { + public void postCommit() + { + if (restoreCredit) + { + restoreCredit(entry); + } + entry.delete(); + } + + public void onRollback() + { + + } + }); + } + + void reject(final QueueEntry entry) + { + entry.setRedelivered(); + entry.routeToAlternate(null, null); + if(entry.isAcquiredBy(getSubscription())) + { + entry.delete(); + } + } + + void release(final QueueEntry entry, final boolean setRedelivered) + { + if (setRedelivered) + { + entry.setRedelivered(); + } + + if (getSessionModel().isClosing() || !setRedelivered) + { + entry.decrementDeliveryCount(); + } + + if (isMaxDeliveryLimitReached(entry)) + { + sendToDLQOrDiscard(entry); + } + else + { + entry.release(); + } + } + + protected void sendToDLQOrDiscard(QueueEntry entry) + { + final LogActor logActor = CurrentActor.get(); + final ServerMessage msg = entry.getMessage(); + + int requeues = entry.routeToAlternate(new Action<QueueEntry>() + { + @Override + public void performAction(final QueueEntry requeueEntry) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); + + if (requeues == 0) + { + final AMQQueue queue = entry.getQueue(); + final Exchange alternateExchange = queue.getAlternateExchange(); + + if(alternateExchange != null) + { + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); + } + else + { + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); + } + } + } + + private boolean isMaxDeliveryLimitReached(QueueEntry entry) + { + final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); + return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); + } + + public void queueDeleted() + { + _deleted.set(true); + } + + public boolean allocateCredit(QueueEntry entry) + { + return _creditManager.useCreditForMessage(entry.getMessage().getSize()); + } + + public void restoreCredit(QueueEntry queueEntry) + { + _creditManager.restoreCredit(1, queueEntry.getSize()); + } + + public FlowCreditManager_0_10 getCreditManager() + { + return _creditManager; + } + + + public void stop() + { + try + { + getSubscription().getSendLock(); + + updateState(State.ACTIVE, State.SUSPENDED); + _stopped.set(true); + FlowCreditManager_0_10 creditManager = getCreditManager(); + creditManager.clearCredit(); + } + finally + { + getSubscription().releaseSendLock(); + } + } + + public void addCredit(MessageCreditUnit unit, long value) + { + FlowCreditManager_0_10 creditManager = getCreditManager(); + + switch (unit) + { + case MESSAGE: + + creditManager.addCredit(value, 0L); + break; + case BYTE: + creditManager.addCredit(0l, value); + break; + } + + _stopped.set(false); + + if(creditManager.hasCredit()) + { + updateState(State.SUSPENDED, State.ACTIVE); + } + + } + + public void setFlowMode(MessageFlowMode flowMode) + { + + + _creditManager.removeListener(this); + + switch(flowMode) + { + case CREDIT: + _creditManager = new CreditCreditManager(0l,0l); + break; + case WINDOW: + _creditManager = new WindowCreditManager(0l,0l); + break; + default: + throw new RuntimeException("Unknown message flow mode: " + flowMode); + } + _flowMode = flowMode; + updateState(State.ACTIVE, State.SUSPENDED); + + _creditManager.addStateListener(this); + + } + + public boolean isStopped() + { + return _stopped.get(); + } + + public void acknowledge(QueueEntry entry) + { + // TODO Fix Store Context / cleanup + if(entry.isAcquiredBy(getSubscription())) + { + _unacknowledgedBytes.addAndGet(-entry.getSize()); + _unacknowledgedCount.decrementAndGet(); + entry.delete(); + } + } + + public void flush() throws AMQException + { + flushCreditState(true); + getSubscription().flush(); + stop(); + } + + public ServerSession getSessionModel() + { + return _session; + } + + public boolean isDurable() + { + return false; + } + + public Map<String, Object> getArguments() + { + return _arguments; + } + + public void queueEmpty() + { + } + + public void flushBatched() + { + _session.getConnection().flush(); + } + + + public long getUnacknowledgedBytes() + { + return _unacknowledgedBytes.longValue(); + } + + public long getUnacknowledgedMessages() + { + return _unacknowledgedCount.longValue(); + } +} diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java deleted file mode 100644 index 357b565365..0000000000 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ /dev/null @@ -1,944 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; - -import java.text.MessageFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject -{ - private final long _subscriptionID; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - - private static final Option[] BATCHED = new Option[] { Option.BATCH }; - - private final Lock _stateChangeLock = new ReentrantLock(); - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private volatile AMQQueue.Context _queueContext; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - private FlowCreditManager_0_10 _creditManager; - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - private AMQQueue _queue; - private final String _destination; - private boolean _noLocal; - private final FilterManager _filters; - private final MessageAcceptMode _acceptMode; - private final MessageAcquireMode _acquireMode; - private MessageFlowMode _flowMode; - private final ServerSession _session; - private final AtomicBoolean _stopped = new AtomicBoolean(true); - private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; - - private LogActor _logActor; - private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private String _traceExclude; - private String _trace; - private final long _createTime = System.currentTimeMillis(); - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - private final AtomicLong _unacknowledgedCount = new AtomicLong(0); - private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - - private final Map<String, Object> _arguments; - private int _deferredMessageCredit; - private long _deferredSizeCredit; - - - public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, - MessageFlowMode flowMode, - FlowCreditManager_0_10 creditManager, - FilterManager filters,Map<String, Object> arguments) - { - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _session = session; - _postIdSettingAction = new AddMessageDispositionListenerAction(session); - _destination = destination; - _acceptMode = acceptMode; - _acquireMode = acquireMode; - _creditManager = creditManager; - _flowMode = flowMode; - _filters = filters; - _creditManager.addStateListener(this); - _arguments = arguments == null ? Collections.<String, Object> emptyMap() : - Collections.<String, Object> unmodifiableMap(arguments); - _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); - - } - - public void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(AMQQueue queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); - _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); - String filterLogString = null; - - _logActor = GenericActor.getInstance(this); - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - filterLogString = getFilterLogString(); - CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, - filterLogString.length() > 0)); - } - } - - public String getConsumerName() - { - return _destination; - } - - public boolean isSuspended() - { - return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension - } - - public boolean hasInterest(QueueEntry entry) - { - - - - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - - return false; - } - - if (entry.getMessage() instanceof MessageTransferMessage) - { - if(_noLocal) - { - Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); - if (connectionRef != null && connectionRef == _session.getReference()) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null) - { - return false; - } - } - - - return checkFilters(entry); - - - } - - private boolean checkFilters(QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - public boolean isBrowser() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean seesRequeues() - { - return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); - } - finally - { - _stateChangeLock.unlock(); - } - - - - } - - public Long getDelivered() - { - return _deliveredCount.get(); - } - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - - - public static class AddMessageDispositionListenerAction implements Runnable - { - private MessageTransfer _xfr; - private ServerSession.MessageDispositionChangeListener _action; - private ServerSession _session; - - public AddMessageDispositionListenerAction(ServerSession session) - { - _session = session; - } - - public void setXfr(MessageTransfer xfr) - { - _xfr = xfr; - } - - public void setAction(ServerSession.MessageDispositionChangeListener action) - { - _action = action; - } - - public void run() - { - if(_action != null) - { - _session.onMessageDispositionChange(_xfr, _action); - } - } - } - - private final AddMessageDispositionListenerAction _postIdSettingAction; - - public void send(final QueueEntry entry, boolean batch) throws AMQException - { - ServerMessage serverMsg = entry.getMessage(); - - - MessageTransfer xfr; - - DeliveryProperties deliveryProps; - MessageProperties messageProps = null; - - MessageTransferMessage msg; - - if(serverMsg instanceof MessageTransferMessage) - { - - msg = (MessageTransferMessage) serverMsg; - - } - else - { - MessageConverter converter = - MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); - - - msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost()); - } - DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); - messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); - - deliveryProps = new DeliveryProperties(); - if(origDeliveryProps != null) - { - if(origDeliveryProps.hasDeliveryMode()) - { - deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); - } - if(origDeliveryProps.hasExchange()) - { - deliveryProps.setExchange(origDeliveryProps.getExchange()); - } - if(origDeliveryProps.hasExpiration()) - { - deliveryProps.setExpiration(origDeliveryProps.getExpiration()); - } - if(origDeliveryProps.hasPriority()) - { - deliveryProps.setPriority(origDeliveryProps.getPriority()); - } - if(origDeliveryProps.hasRoutingKey()) - { - deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); - } - if(origDeliveryProps.hasTimestamp()) - { - deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); - } - if(origDeliveryProps.hasTtl()) - { - deliveryProps.setTtl(origDeliveryProps.getTtl()); - } - - - } - - deliveryProps.setRedelivered(entry.isRedelivered()); - - if(_trace != null && messageProps == null) - { - messageProps = new MessageProperties(); - } - - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - - - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); - - boolean excludeDueToFederation = false; - - if(_trace != null) - { - if(!messageProps.hasApplicationHeaders()) - { - messageProps.setApplicationHeaders(new HashMap<String,Object>()); - } - Map<String,Object> appHeaders = messageProps.getApplicationHeaders(); - String trace = (String) appHeaders.get("x-qpid.trace"); - if(trace == null) - { - trace = _trace; - } - else - { - if(_traceExclude != null) - { - excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude); - } - trace+=","+_trace; - } - appHeaders.put("x-qpid.trace",trace); - } - - if(!excludeDueToFederation) - { - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); - } - else if(_flowMode == MessageFlowMode.WINDOW) - { - xfr.setCompletionListener(new Method.CompletionListener() - { - public void onComplete(Method method) - { - deferredAddCredit(1, entry.getSize()); - } - }); - } - - - _postIdSettingAction.setXfr(xfr); - if(_acceptMode == MessageAcceptMode.EXPLICIT) - { - _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); - } - else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); - } - else - { - _postIdSettingAction.setAction(null); - } - - - _session.sendMessage(xfr, _postIdSettingAction); - entry.incrementDeliveryCount(); - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getSize()); - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - forceDequeue(entry, false); - } - else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - recordUnacknowledged(entry); - } - } - else - { - forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW); - - } - } - - void recordUnacknowledged(QueueEntry entry) - { - _unacknowledgedCount.incrementAndGet(); - _unacknowledgedBytes.addAndGet(entry.getSize()); - } - - private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) - { - _deferredMessageCredit += deferredMessageCredit; - _deferredSizeCredit += deferredSizeCredit; - - } - - public void flushCreditState(boolean strict) - { - if(strict || !isSuspended() || _deferredMessageCredit >= 200 - || !(_creditManager instanceof WindowCreditManager) - || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) - { - _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); - _deferredMessageCredit = 0; - _deferredSizeCredit = 0l; - } - } - - private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) - { - AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), - new ServerTransaction.Action() - { - public void postCommit() - { - if (restoreCredit) - { - restoreCredit(entry); - } - entry.delete(); - } - - public void onRollback() - { - - } - }); - } - - void reject(final QueueEntry entry) - { - entry.setRedelivered(); - entry.routeToAlternate(null, null); - if(entry.isAcquiredBy(this)) - { - entry.delete(); - } - } - - void release(final QueueEntry entry, final boolean setRedelivered) - { - if (setRedelivered) - { - entry.setRedelivered(); - } - - if (getSessionModel().isClosing() || !setRedelivered) - { - entry.decrementDeliveryCount(); - } - - if (isMaxDeliveryLimitReached(entry)) - { - sendToDLQOrDiscard(entry); - } - else - { - entry.release(); - } - } - - protected void sendToDLQOrDiscard(QueueEntry entry) - { - final LogActor logActor = CurrentActor.get(); - final ServerMessage msg = entry.getMessage(); - - int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction() - { - @Override - public void onEnqueue(final QueueEntry requeueEntry) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); - } - }, null); - - if (requeues == 0) - { - final AMQQueue queue = entry.getQueue(); - final Exchange alternateExchange = queue.getAlternateExchange(); - - if(alternateExchange != null) - { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), - alternateExchange.getName())); - } - else - { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), - queue.getName(), - msg.getRoutingKey())); - } - } - } - - private boolean isMaxDeliveryLimitReached(QueueEntry entry) - { - final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); - return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); - } - - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); - } - - public boolean wouldSuspend(QueueEntry entry) - { - return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void restoreCredit(QueueEntry queueEntry) - { - _creditManager.restoreCredit(1, queueEntry.getSize()); - } - - public void onDequeue(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void releaseQueueEntry(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void setStateListener(StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - - public FlowCreditManager_0_10 getCreditManager() - { - return _creditManager; - } - - - public void stop() - { - try - { - getSendLock(); - - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - _stopped.set(true); - FlowCreditManager_0_10 creditManager = getCreditManager(); - creditManager.clearCredit(); - } - finally - { - releaseSendLock(); - } - } - - public void addCredit(MessageCreditUnit unit, long value) - { - FlowCreditManager_0_10 creditManager = getCreditManager(); - - switch (unit) - { - case MESSAGE: - - creditManager.addCredit(value, 0L); - break; - case BYTE: - creditManager.addCredit(0l, value); - break; - } - - _stopped.set(false); - - if(creditManager.hasCredit()) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - } - - } - - public void setFlowMode(MessageFlowMode flowMode) - { - - - _creditManager.removeListener(this); - - switch(flowMode) - { - case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); - break; - case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); - break; - default: - throw new RuntimeException("Unknown message flow mode: " + flowMode); - } - _flowMode = flowMode; - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - - _creditManager.addStateListener(this); - - } - - public boolean isStopped() - { - return _stopped.get(); - } - - public boolean acquires() - { - return _acquireMode == MessageAcquireMode.PRE_ACQUIRED; - } - - public void acknowledge(QueueEntry entry) - { - // TODO Fix Store Context / cleanup - if(entry.isAcquiredBy(this)) - { - _unacknowledgedBytes.addAndGet(-entry.getSize()); - _unacknowledgedCount.decrementAndGet(); - entry.delete(); - } - } - - public void flush() throws AMQException - { - flushCreditState(true); - _queue.flushSubscription(this); - stop(); - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public LogActor getLogActor() - { - return _logActor; - } - - public boolean isTransient() - { - return false; - } - - public ServerSession getSessionModel() - { - return _session; - } - - public boolean isBrowsing() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean isExclusive() - { - return getQueue().hasExclusiveSubscriber(); - } - - public boolean isDurable() - { - return false; - } - - - public boolean isExplicitAcknowledge() - { - return _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public String getCreditMode() - { - return _flowMode.toString(); - } - - public String getName() - { - return _destination; - } - - public Map<String, Object> getArguments() - { - return _arguments; - } - - public boolean isSessionTransactional() - { - return _session.isTransactional(); - } - - public void queueEmpty() - { - } - - public long getCreateTime() - { - return _createTime; - } - - public String toLogString() - { - String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), - _queue.getName()); - String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" - // queueString is "vh(/{0})/qu({1}) " so need to trim - + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; - return result; - } - - private String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } - - if (isBrowser()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - hasEntries = true; - } - - if (isDurable()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Durable"); - hasEntries = true; - } - - return filterLogString.toString(); - } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } - - - public void flushBatched() - { - _session.getConnection().flush(); - } - - public long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - public long getUnacknowledgedBytes() - { - return _unacknowledgedBytes.longValue(); - } - - public long getUnacknowledgedMessages() - { - return _unacknowledgedCount.longValue(); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c6d4151628..9df1e7b89b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -85,6 +85,7 @@ import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; @@ -1256,7 +1257,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { BaseQueue queue = _destinationQueues.get(i); - BaseQueue.PostEnqueueAction action; + Action<QueueEntry> action; if(immediate) { @@ -1267,7 +1268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F action = null; } - queue.enqueue(message, isTransactional(), action); + queue.enqueue(message, action); if(queue instanceof AMQQueue) { @@ -1295,14 +1296,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction implements BaseQueue.PostEnqueueAction + private class ImmediateAction implements Action<QueueEntry> { public ImmediateAction() { } - public void onEnqueue(QueueEntry entry) + public void performAction(QueueEntry entry) { AMQQueue queue = entry.getQueue(); @@ -1310,11 +1311,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { ServerTransaction txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) + MessageReference ref = message.newReference(); + entry.delete(); + txn.dequeue(queue, message, + new ServerTransaction.Action() { @Override public void postCommit() @@ -1336,11 +1337,17 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { throw new RuntimeException(e); } - super.postCommit(); + } + + @Override + public void onRollback() + { + } } ); txn.commit(); + ref.release(); } @@ -1352,10 +1359,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction + private final class CapacityCheckAction implements Action<QueueEntry> { @Override - public void onEnqueue(final QueueEntry entry) + public void performAction(final QueueEntry entry) { AMQQueue queue = entry.getQueue(); queue.checkCapacity(AMQChannel.this); @@ -1398,10 +1405,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - for(QueueEntry entry : _ackedMessages) - { - entry.release(); - } + for(QueueEntry entry : _ackedMessages) + { + entry.release(); + } } finally { @@ -1576,10 +1583,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { final ServerMessage msg = rejectedQueueEntry.getMessage(); - int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction() + int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>() { @Override - public void onEnqueue(final QueueEntry requeueEntry) + public void performAction(final QueueEntry requeueEntry) { _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), requeueEntry.getQueue().getName())); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index c7a84fa3b6..8d45160848 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1678,7 +1678,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi props, _channelId, deliveryTag, - ((SubscriptionImpl)sub).getConsumerTag()); + new AMQShortString(sub.getName())); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java index 93b51a0567..05f35748ee 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java @@ -25,10 +25,14 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.DelegatingSubscription; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; public class SubscriptionFactoryImpl implements SubscriptionFactory { @@ -47,27 +51,30 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory return createSubscription(channel, protocolSession, consumerTag, acks, filters, - noLocal, - creditManager, - clientMethod, - recordMethod - ); + noLocal, + creditManager, + clientMethod, + recordMethod + ); } + public Subscription createSubscription(final AMQChannel channel, - final AMQProtocolSession protocolSession, - final AMQShortString consumerTag, - final boolean acks, - final FieldTable filters, - final boolean noLocal, - final FlowCreditManager creditManager, - final ClientDeliveryMethod clientMethod, - final RecordDeliveryMethod recordMethod - ) + final AMQProtocolSession protocolSession, + final AMQShortString consumerTag, + final boolean acks, + final FieldTable filters, + final boolean noLocal, + final FlowCreditManager creditManager, + final ClientDeliveryMethod clientMethod, + final RecordDeliveryMethod recordMethod + ) throws AMQException { boolean isBrowser; - + SubscriptionTarget_0_8 target; + Subscription subscription; + if (filters != null) { Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue()); @@ -78,30 +85,56 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory isBrowser = false; } + final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + boolean acquires; + boolean seesReuques; + boolean isTransient; if(isBrowser) { - return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); + target = new SubscriptionTarget_0_8.BrowserSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod); + acquires = false; + seesReuques = false; + isTransient = true; } else if(acks) { - return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); + target = new SubscriptionTarget_0_8.AckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod); + acquires = true; + seesReuques = true; + isTransient = false; } else { - return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); + target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod); + acquires = true; + seesReuques = true; + isTransient = false; } + subscription = + new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, acquires, seesReuques, AMQShortString.toString(consumerTag),isTransient,target); + target.setSubscription(subscription); + return subscription; } - public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel, - final AMQProtocolSession session, - final AMQShortString consumerTag, - final FieldTable filters, - final boolean noLocal, - final FlowCreditManager creditManager, - final ClientDeliveryMethod deliveryMethod, - final RecordDeliveryMethod recordMethod) throws AMQException + + + public Subscription createBasicGetNoAckSubscription(final AMQChannel channel, + final AMQProtocolSession session, + final AMQShortString consumerTag, + final FieldTable filters, + final boolean noLocal, + final FlowCreditManager creditManager, + final ClientDeliveryMethod deliveryMethod, + final RecordDeliveryMethod recordMethod) throws AMQException { - return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod); + SubscriptionTarget_0_8 target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + + Subscription subscription; + final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + subscription = + new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, true, true, AMQShortString.toString(consumerTag),true,target); + target.setSubscription(subscription); + return subscription; } public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java deleted file mode 100644 index 7c52fbe3b0..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ /dev/null @@ -1,858 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.SubscriptionActor; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag - * that was given out by the broker and the channel id. <p/> - */ -public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener -{ - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - - } - }; - - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private volatile AMQQueue.Context _queueContext; - - private final ClientDeliveryMethod _deliveryMethod; - private final RecordDeliveryMethod _recordMethod; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - - private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - - private final Lock _stateChangeLock; - - private final long _subscriptionID; - private LogSubject _logSubject; - private LogActor _logActor; - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - - private final AtomicLong _unacknowledgedCount = new AtomicLong(0); - private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - - private long _createTime = System.currentTimeMillis(); - - - static final class BrowserSubscription extends SubscriptionImpl - { - public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return true; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry - * @param batch - * @throws AMQException - */ - @Override - public void send(QueueEntry entry, boolean batch) throws AMQException - { - // We don't decrement the reference here as we don't want to consume the message - // but we do want to send it to the client. - - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); - } - - } - - @Override - public boolean wouldSuspend(QueueEntry msg) - { - return false; - } - - } - - public static class NoAckSubscription extends SubscriptionImpl - { - private final AutoCommitTransaction _txn; - - public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore()); - } - - - public boolean isBrowser() - { - return false; - } - - @Override - public boolean isExplicitAcknowledge() - { - return false; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry The message to send - * @param batch - * @throws AMQException - */ - @Override - public void send(QueueEntry entry, boolean batch) throws AMQException - { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. - - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. - - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - _txn.dequeue(getQueue(), entry.getMessage(), NOOP); - - ServerMessage message = entry.getMessage(); - MessageReference ref = message.newReference(); - InstanceProperties props = entry.getInstanceProperties(); - entry.delete(); - - synchronized (getChannel()) - { - getChannel().getProtocolSession().setDeferFlush(batch); - long deliveryTag = getChannel().getNextDeliveryTag(); - - sendToClient(message, props, deliveryTag); - - } - ref.release(); - - - } - - @Override - public boolean wouldSuspend(QueueEntry msg) - { - return false; - } - - private static final ServerTransaction.Action NOOP = - new ServerTransaction.Action() - { - @Override - public void postCommit() - { - } - - @Override - public void onRollback() - { - } - }; - } - - /** - * NoAck Subscription for use with BasicGet method. - */ - public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription - { - public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - public boolean isTransient() - { - return true; - } - - public boolean wouldSuspend(QueueEntry msg) - { - return !getCreditManager().useCreditForMessage(msg.getMessage().getSize()); - } - - } - - static final class AckSubscription extends SubscriptionImpl - { - public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return false; - } - - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry The message to send - * @param batch - * @throws AMQException - */ - @Override - public void send(QueueEntry entry, boolean batch) throws AMQException - { - - - synchronized (getChannel()) - { - getChannel().getProtocolSession().setDeferFlush(batch); - long deliveryTag = getChannel().getNextDeliveryTag(); - - addUnacknowledgedMessage(entry); - recordMessageDelivery(entry, deliveryTag); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); - entry.incrementDeliveryCount(); - - } - } - - - - } - - - private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); - - private final AMQChannel _channel; - - private final AMQShortString _consumerTag; - - - private boolean _noLocal; - - private final FlowCreditManager _creditManager; - - private FilterManager _filters; - - private final Boolean _autoClose; - - private AMQQueue _queue; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - - - public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable arguments, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _channel = channel; - _consumerTag = consumerTag; - - _creditManager = creditManager; - creditManager.addStateListener(this); - - _noLocal = noLocal; - - - _filters = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments)); - - _deliveryMethod = deliveryMethod; - _recordMethod = recordMethod; - - - _stateChangeLock = new ReentrantLock(); - - - if (arguments != null) - { - Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); - if (autoClose != null) - { - _autoClose = (Boolean) autoClose; - } - else - { - _autoClose = false; - } - } - else - { - _autoClose = false; - } - - } - - public AMQSessionModel getSessionModel() - { - return _channel; - } - - public Long getDelivered() - { - return _deliveredCount.get(); - } - - public synchronized void setQueue(AMQQueue queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - _logSubject = new SubscriptionLogSubject(this); - _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); - - if (CurrentActor.get().getRootMessageLogger(). - isMessageEnabled(CurrentActor.get(), _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - // Get the string value of the filters - String filterLogString = null; - if (_filters != null && _filters.hasFilters()) - { - filterLogString = _filters.toString(); - } - - if (isAutoClose()) - { - if (filterLogString == null) - { - filterLogString = ""; - } - else - { - filterLogString += ","; - } - filterLogString += "AutoClose"; - } - - if (isBrowser()) - { - // We do not need to check for null here as all Browsers are AutoClose - filterLogString +=",Browser"; - } - - CurrentActor.get(). - message(_logSubject, - SubscriptionMessages.CREATE(filterLogString, - queue.isDurable() && exclusive, - filterLogString != null)); - } - } - - public String toString() - { - String subscriber = "[channel=" + _channel + - ", consumerTag=" + _consumerTag + - ", session=" + getProtocolSession().getKey() ; - - return subscriber + "]"; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry - * @param batch - * @throws AMQException - */ - abstract public void send(QueueEntry entry, boolean batch) throws AMQException; - - - public boolean isSuspended() - { - return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); - } - - /** - * Callback indicating that a queue has been deleted. - * - * @param queue The queue to delete - */ - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); - } - - public boolean hasInterest(QueueEntry entry) - { - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Subscription:" + this + " rejected message:" + entry); - } - } - - if(entry.getMessage() instanceof AMQMessage) - { - if (_noLocal) - { - AMQMessage message = (AMQMessage) entry.getMessage(); - - final Object publisherReference = message.getConnectionReference(); - - // We don't want local messages so check to see if message is one we sent - Object localReference = getProtocolSession().getReference(); - - if(publisherReference != null && publisherReference.equals(localReference)) - { - return false; - } - } - } - else - { - // No interest in messages we can't convert to AMQMessage - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class)==null) - { - return false; - } - } - - - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + this + ") checking filters for message (" + entry); - } - return checkFilters(entry); - - } - - private boolean checkFilters(QueueEntry msg) - { - return (_filters == null) || _filters.allAllow(msg.asFilterable()); - } - - public boolean isAutoClose() - { - return _autoClose; - } - - public FlowCreditManager getCreditManager() - { - return _creditManager; - } - - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - } - finally - { - _stateChangeLock.unlock(); - } - //Log Subscription closed - CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE()); - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - - public boolean wouldSuspend(QueueEntry msg) - { - return !_creditManager.useCreditForMessage(msg.getMessage().getSize()); - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public AMQChannel getChannel() - { - return _channel; - } - - public AMQShortString getConsumerTag() - { - return _consumerTag; - } - - public String getConsumerName() - { - return _consumerTag == null ? null : _consumerTag.asString(); - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public AMQProtocolSession getProtocolSession() - { - return _channel.getProtocolSession(); - } - - public LogActor getLogActor() - { - return _logActor; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public void onDequeue(final QueueEntry queueEntry) - { - restoreCredit(queueEntry); - } - - public void releaseQueueEntry(final QueueEntry queueEntry) - { - restoreCredit(queueEntry); - } - - public void restoreCredit(final QueueEntry queueEntry) - { - _creditManager.restoreCredit(1, queueEntry.getSize()); - } - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - CurrentActor.get().message(_logSubject,SubscriptionMessages.STATE(_state.get().toString())); - } - - public State getState() - { - return _state.get(); - } - - - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context context) - { - _queueContext = context; - } - - - protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) - throws AMQException - { - _deliveryMethod.deliverToClient(this, message, props, deliveryTag); - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(message.getSize()); - } - - - protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag) - { - _recordMethod.recordMessageDelivery(this,entry,deliveryTag); - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void confirmAutoClose() - { - ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); - } - - public boolean acquires() - { - return !isBrowser(); - } - - public boolean seesRequeues() - { - return !isBrowser(); - } - - public boolean isTransient() - { - return false; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - - public void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - abstract boolean isBrowser(); - - public String getCreditMode() - { - return "WINDOW"; - } - - public boolean isBrowsing() - { - return isBrowser(); - } - - public boolean isExplicitAcknowledge() - { - return true; - } - - public boolean isDurable() - { - return false; - } - - public boolean isExclusive() - { - return getQueue().hasExclusiveSubscriber(); - } - - public String getName() - { - return String.valueOf(_consumerTag); - } - - public Map<String, Object> getArguments() - { - return null; - } - - public boolean isSessionTransactional() - { - return _channel.isTransactional(); - } - - public long getCreateTime() - { - return _createTime; - } - - public void queueEmpty() throws AMQException - { - if (isAutoClose()) - { - _queue.unregisterSubscription(this); - - confirmAutoClose(); - } - } - - public void flushBatched() - { - _channel.getProtocolSession().setDeferFlush(false); - - _channel.getProtocolSession().flushBatched(); - } - - public long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - - protected void addUnacknowledgedMessage(QueueEntry entry) - { - final long size = entry.getSize(); - _unacknowledgedBytes.addAndGet(size); - _unacknowledgedCount.incrementAndGet(); - entry.addStateChangeListener(new QueueEntry.StateChangeListener() - { - public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState) - { - if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED)) - { - _unacknowledgedBytes.addAndGet(-size); - _unacknowledgedCount.decrementAndGet(); - entry.removeStateChangeListener(this); - } - } - }); - } - - public long getUnacknowledgedBytes() - { - return _unacknowledgedBytes.longValue(); - } - - public long getUnacknowledgedMessages() - { - return _unacknowledgedCount.longValue(); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java new file mode 100644 index 0000000000..6c91e6130e --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java @@ -0,0 +1,506 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.RecordDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.StateChangeListener; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> + */ +public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener +{ + + private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener = + new StateChangeListener<QueueEntry, QueueEntry.State>() + { + @Override + public void stateChanged(final QueueEntry entry, + final QueueEntry.State oldSate, + final QueueEntry.State newState) + { + if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) + { + restoreCredit(entry); + } + entry.removeStateChangeListener(this); + } + }; + + private final ClientDeliveryMethod _deliveryMethod; + private final RecordDeliveryMethod _recordMethod; + + private final AtomicLong _unacknowledgedCount = new AtomicLong(0); + private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); + private Subscription _subscription; + + + static final class BrowserSubscription extends SubscriptionTarget_0_8 + { + public BrowserSubscription(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, + filters, creditManager, deliveryMethod, recordMethod); + } + + /** + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. + * + * + * @param entry + * @param batch + * @throws org.apache.qpid.AMQException + */ + @Override + public void send(QueueEntry entry, boolean batch) throws AMQException + { + // We don't decrement the reference here as we don't want to consume the message + // but we do want to send it to the client. + + synchronized (getChannel()) + { + long deliveryTag = getChannel().getNextDeliveryTag(); + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + } + + } + + @Override + public boolean allocateCredit(QueueEntry msg) + { + return true; + } + + } + + public static class NoAckSubscription extends SubscriptionTarget_0_8 + { + private final AutoCommitTransaction _txn; + + public NoAckSubscription(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + + _txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore()); + } + + /** + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. + * + * + * @param entry The message to send + * @param batch + * @throws org.apache.qpid.AMQException + */ + @Override + public void send(QueueEntry entry, boolean batch) throws AMQException + { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. + + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + _txn.dequeue(getSubscription().getQueue(), entry.getMessage(), NOOP); + + ServerMessage message = entry.getMessage(); + MessageReference ref = message.newReference(); + InstanceProperties props = entry.getInstanceProperties(); + entry.delete(); + + synchronized (getChannel()) + { + getChannel().getProtocolSession().setDeferFlush(batch); + long deliveryTag = getChannel().getNextDeliveryTag(); + + sendToClient(message, props, deliveryTag); + + } + ref.release(); + + + } + + @Override + public boolean allocateCredit(QueueEntry msg) + { + return true; + } + + private static final ServerTransaction.Action NOOP = + new ServerTransaction.Action() + { + @Override + public void postCommit() + { + } + + @Override + public void onRollback() + { + } + }; + } + + /** + * NoAck Subscription for use with BasicGet method. + */ + public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription + { + public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, + AMQShortString consumerTag, FieldTable filters, + boolean noLocal, FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + + public boolean allocateCredit(QueueEntry msg) + { + return getCreditManager().useCreditForMessage(msg.getMessage().getSize()); + } + + } + + static final class AckSubscription extends SubscriptionTarget_0_8 + { + public AckSubscription(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + + /** + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. + * + * + * @param entry The message to send + * @param batch + * @throws org.apache.qpid.AMQException + */ + @Override + public void send(QueueEntry entry, boolean batch) throws AMQException + { + + + synchronized (getChannel()) + { + getChannel().getProtocolSession().setDeferFlush(batch); + long deliveryTag = getChannel().getNextDeliveryTag(); + + addUnacknowledgedMessage(entry); + recordMessageDelivery(entry, deliveryTag); + entry.addStateChangeListener(getReleasedStateChangeListener()); + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + entry.incrementDeliveryCount(); + + } + } + + + + } + + + private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class); + + private final AMQChannel _channel; + + private final AMQShortString _consumerTag; + + private final FlowCreditManager _creditManager; + + private final Boolean _autoClose; + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + + + public SubscriptionTarget_0_8(AMQChannel channel, + AMQShortString consumerTag, + FieldTable arguments, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(State.ACTIVE); + + _channel = channel; + _consumerTag = consumerTag; + + _creditManager = creditManager; + creditManager.addStateListener(this); + + _deliveryMethod = deliveryMethod; + _recordMethod = recordMethod; + + if (arguments != null) + { + Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); + if (autoClose != null) + { + _autoClose = (Boolean) autoClose; + } + else + { + _autoClose = false; + } + } + else + { + _autoClose = false; + } + } + + public void setSubscription(Subscription subscription) + { + _subscription = subscription; + } + + public Subscription getSubscription() + { + return _subscription; + } + + public AMQSessionModel getSessionModel() + { + return _channel; + } + + public String toString() + { + String subscriber = "[channel=" + _channel + + ", consumerTag=" + _consumerTag + + ", session=" + getProtocolSession().getKey() ; + + return subscriber + "]"; + } + + public boolean isSuspended() + { + return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); + } + + /** + * Callback indicating that a queue has been deleted. + * + */ + public void queueDeleted() + { + _deleted.set(true); + } + + public boolean isAutoClose() + { + return _autoClose; + } + + public FlowCreditManager getCreditManager() + { + return _creditManager; + } + + + public boolean close() + { + boolean closed = false; + State state = getState(); + + getSubscription().getSendLock(); + try + { + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + } + _creditManager.removeListener(this); + return closed; + } + finally + { + getSubscription().releaseSendLock(); + } + } + + + public boolean allocateCredit(QueueEntry msg) + { + return _creditManager.useCreditForMessage(msg.getMessage().getSize()); + } + + public AMQChannel getChannel() + { + return _channel; + } + + public AMQShortString getConsumerTag() + { + return _consumerTag; + } + + public AMQProtocolSession getProtocolSession() + { + return _channel.getProtocolSession(); + } + + public void restoreCredit(final QueueEntry queueEntry) + { + _creditManager.restoreCredit(1, queueEntry.getSize()); + } + + protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener() + { + return _entryReleaseListener; + } + + public void creditStateChanged(boolean hasCredit) + { + + if(hasCredit) + { + if(!updateState(State.SUSPENDED, State.ACTIVE)) + { + // this is a hack to get round the issue of increasing bytes credit + getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + } + } + else + { + updateState(State.ACTIVE, State.SUSPENDED); + } + } + + protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + throws AMQException + { + _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag); + + } + + + protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag) + { + _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag); + } + + + public void confirmAutoClose() + { + ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); + } + + public void queueEmpty() throws AMQException + { + if (isAutoClose()) + { + close(); + confirmAutoClose(); + } + } + + public void flushBatched() + { + _channel.getProtocolSession().setDeferFlush(false); + + _channel.getProtocolSession().flushBatched(); + } + + protected void addUnacknowledgedMessage(QueueEntry entry) + { + final long size = entry.getSize(); + _unacknowledgedBytes.addAndGet(size); + _unacknowledgedCount.incrementAndGet(); + entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>() + { + public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState) + { + if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED)) + { + _unacknowledgedBytes.addAndGet(-size); + _unacknowledgedCount.decrementAndGet(); + entry.removeStateChangeListener(this); + } + } + }); + } + + public long getUnacknowledgedBytes() + { + return _unacknowledgedBytes.longValue(); + } + + public long getUnacknowledgedMessages() + { + return _unacknowledgedCount.longValue(); + } +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 4b569ccc71..a48ae3826e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -162,7 +162,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } queue.registerSubscription(sub,false); - queue.flushSubscription(sub); + sub.flush(); queue.unregisterSubscription(sub); return(!singleMessageCredit.hasCredit()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index f2ab154b32..90f80a27e7 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -98,8 +98,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR " on channel:" + channel.debugIdentity()); } - message.reject(); - if (body.getRequeue()) { channel.requeue(deliveryTag); @@ -110,7 +108,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } else { - final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); + message.reject(); + + final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); if (maxDeliveryCountEnabled) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 3fdce83c2a..263175d590 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -134,8 +135,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } }; protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException + queue.addQueueDeleteTask(new Action<AMQQueue>() { + public void performAction(AMQQueue queue) { protocolConnection.removeSessionCloseTask(sessionCloseTask); } @@ -245,9 +246,9 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar session.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { session.removeSessionCloseTask(deleteQueueTask); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index ef0837b3c6..6d3e758144 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -60,7 +60,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class); // ChannelID(LIST) -> LinkedList<Pair> - private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; + private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers; private AtomicInteger _deliveryCount = new AtomicInteger(0); private static final AtomicLong ID_GENERATOR = new AtomicLong(0); @@ -68,7 +68,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null); - _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); + _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>(); setTestAuthorizedSubject(); setVirtualHost(virtualHost); @@ -117,7 +117,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { synchronized (_channelDelivers) { - List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag); + List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag)); if (all == null) { @@ -153,23 +153,23 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr synchronized (_channelDelivers) { - Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); + Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); if (consumers == null) { - consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + consumers = new HashMap<String, LinkedList<DeliveryPair>>(); _channelDelivers.put(channelId, consumers); } - LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag); + LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag)); if (consumerDelivers == null) { consumerDelivers = new LinkedList<DeliveryPair>(); - consumers.put(consumerTag, consumerDelivers); + consumers.put(consumerTag.toString(), consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg)); + consumerDelivers.add(new DeliveryPair(deliveryTag, msg)); } } @@ -254,20 +254,20 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr synchronized (_channelDelivers) { - Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); + Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); if (consumers == null) { - consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + consumers = new HashMap<String, LinkedList<DeliveryPair>>(); _channelDelivers.put(_channelId, consumers); } - LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag()); + LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName()); if (consumerDelivers == null) { consumerDelivers = new LinkedList<DeliveryPair>(); - consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers); + consumers.put(sub.getName(), consumerDelivers); } consumerDelivers.add(new DeliveryPair(deliveryTag, message)); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 4082f22e9c..41e2fef03f 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; @@ -53,16 +54,8 @@ public class Connection_1_0 implements ConnectionEventListener private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); private final Object _reference = new Object(); - - - public static interface Task - { - public void doTask(Connection_1_0 connection); - } - - - private List<Task> _closeTasks = - Collections.synchronizedList(new ArrayList<Task>()); + private List<Action<Connection_1_0>> _closeTasks = + Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>()); @@ -98,26 +91,26 @@ public class Connection_1_0 implements ConnectionEventListener _sessions.remove(session); } - void removeConnectionCloseTask(final Task task) + void removeConnectionCloseTask(final Action<Connection_1_0> task) { _closeTasks.remove( task ); } - void addConnectionCloseTask(final Task task) + void addConnectionCloseTask(final Action<Connection_1_0> task) { _closeTasks.add( task ); } public void closeReceived() { - List<Task> taskCopy; + List<Action<Connection_1_0>> taskCopy; synchronized (_closeTasks) { - taskCopy = new ArrayList<Task>(_closeTasks); + taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks); } - for(Task task : taskCopy) + for(Action<Connection_1_0> task : taskCopy) { - task.doTask(this); + task.performAction(this); } synchronized (_closeTasks) { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 4abf1bf76b..400bc1d085 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -69,6 +69,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler @@ -167,7 +168,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY); + _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY, messageFilter == null ? null : new SimpleFilterManager(messageFilter)); } else if(destination instanceof ExchangeDestination) { @@ -309,10 +310,10 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS final String queueName = name; final AMQQueue tempQueue = queue; - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() + final Action<Connection_1_0> deleteQueueTask = + new Action<Connection_1_0>() { - public void doTask(Connection_1_0 session) + public void performAction(Connection_1_0 session) { if (_vhost.getQueue(queueName) == tempQueue) { @@ -331,9 +332,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); } @@ -356,17 +357,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _logger.error("Error", e); } - _subscription = new Subscription_1_0(this, qd, true); + _subscription = new Subscription_1_0(this, qd, true, messageFilter == null ? null : new SimpleFilterManager(messageFilter)); } if(_subscription != null) { _subscription.setNoLocal(noLocal); - if(messageFilter!=null) - { - _subscription.setFilters(new SimpleFilterManager(messageFilter)); - } + try { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 823e4cb16d..51ff9c13cb 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; @@ -343,10 +344,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) { - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() + final Action<Connection_1_0> deleteQueueTask = + new Action<Connection_1_0>() { - public void doTask(Connection_1_0 session) + public void performAction(Connection_1_0 session) { if (_vhost.getQueue(queueName) == tempQueue) { @@ -365,9 +366,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu _connection.addConnectionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { _connection.removeConnectionCloseTask(deleteQueueTask); } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 6a3f5b46e1..149f89fab1 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -22,10 +22,7 @@ package org.apache.qpid.server.protocol.v1_0; import java.nio.ByteBuffer; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; @@ -41,63 +38,37 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Modified; import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.AbstractSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; -class - Subscription_1_0 implements Subscription +class Subscription_1_0 extends AbstractSubscription implements Subscription { private SendingLink_1_0 _link; - private AMQQueue _queue; - - private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED); - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - private final long _id; - private final boolean _acquires; - private volatile AMQQueue.Context _queueContext; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private ReentrantLock _stateChangeLock = new ReentrantLock(); - - private boolean _noLocal; - private FilterManager _filters; - private long _deliveryTag = 0L; - private StateListener _stateListener; private Binary _transactionId; - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer(); - private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry); - - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination) - { - this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY); - } + private final AMQPDescribedTypeRegistry _typeRegistry; + private final SectionEncoder _sectionEncoder; - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires) + public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires, FilterManager filters) { + super(filters,Message_1_0.class,link.getSession().getConnectionReference(), acquires, acquires, link.getEndpoint().getName(), false); _link = link; - _queue = destination.getQueue(); - _id = getEndpoint().getLocalHandle().longValue(); - _acquires = acquires; + _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry(); + _sectionEncoder = new SectionEncoderImpl(_typeRegistry); + setQueue(destination.getQueue(),false); + updateState(State.ACTIVE, State.SUSPENDED); } private SendingLinkEndpoint getEndpoint() @@ -105,89 +76,40 @@ class return _link.getEndpoint(); } - public LogActor getLogActor() - { - return null; //TODO - } - - public boolean isTransient() - { - return true; //TODO - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(final AMQQueue queue, final boolean exclusive) - { - //TODO - } - - public void setNoLocal(final boolean noLocal) - { - _noLocal = noLocal; - } - - public long getSubscriptionID() - { - return _id; - } - public boolean isSuspended() { return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend(); } - public boolean hasInterest(final QueueEntry entry) + public void close() { - if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference()) + boolean closed = false; + State state = getState(); + + getSendLock(); + try { - return false; + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + else + { + getStateListener().stateChanged(this, state, State.CLOSED); + } + } } - else if(!(entry.getMessage() instanceof Message_1_0) - && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) + finally { - return false; + releaseSendLock(); } - return checkFilters(entry); - - } - - private boolean checkFilters(final QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - public boolean isClosed() - { - return !getEndpoint().isAttached(); } - public boolean acquires() - { - return _acquires; - } - - public boolean seesRequeues() - { - // TODO - return acquires(); - } - - public void close() - { - getEndpoint().detach(); - } - - public void send(QueueEntry entry, boolean batch) throws AMQException + protected void doSend(QueueEntry entry, boolean batch) throws AMQException { // TODO send(entry); @@ -301,7 +223,7 @@ class } else { - UnsettledAction action = _acquires + UnsettledAction action = acquires() ? new DispositionAction(tag, queueEntry) : new DoNothingAction(tag, queueEntry); @@ -315,7 +237,7 @@ class transfer.setState(state); } // TODO - need to deal with failure here - if(_acquires && _transactionId != null) + if(acquires() && _transactionId != null) { ServerTransaction txn = _link.getTransaction(_transactionId); if(txn != null) @@ -352,7 +274,7 @@ class } - public void queueDeleted(final AMQQueue queue) + public void queueDeleted() { //TODO getEndpoint().setSource(null); @@ -373,98 +295,33 @@ class } } - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } public void suspend() { synchronized(_link.getLock()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + if(updateState(State.ACTIVE, State.SUSPENDED)) { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED); } } } - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void releaseQueueEntry(QueueEntry queueEntryImpl) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - - public void onDequeue(final QueueEntry queueEntry) - { - //TODO - } public void restoreCredit(final QueueEntry queueEntry) { //TODO } - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - public boolean isSessionTransactional() - { - return false; //TODO - } - public void queueEmpty() { synchronized(_link.getLock()) { if(_link.drained()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + if(updateState(State.ACTIVE, State.SUSPENDED)) { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED); } } } @@ -476,9 +333,9 @@ class { if(isSuspended() && getEndpoint() != null) { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + if(updateState(State.SUSPENDED, State.ACTIVE)) { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + getStateListener().stateChanged(this, State.SUSPENDED, State.ACTIVE); } _transactionId = _link.getTransactionId(); } @@ -640,16 +497,6 @@ class } } - public FilterManager getFilters() - { - return _filters; - } - - public void setFilters(final FilterManager filters) - { - _filters = filters; - } - @Override public AMQSessionModel getSessionModel() { @@ -658,20 +505,6 @@ class } @Override - public long getBytesOut() - { - // TODO - return 0; - } - - @Override - public long getMessagesOut() - { - // TODO - return 0; - } - - @Override public long getUnacknowledgedBytes() { // TODO @@ -685,10 +518,4 @@ class return 0; } - @Override - public String getConsumerName() - { - //TODO - return "TODO"; - } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java index 9f532ec5f7..4defbd7eb0 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java @@ -177,7 +177,8 @@ public class SubscriptionLoggingTest extends AbstractTestLogging List<String> results = findMatches(SUB_PREFIX); - assertEquals("Result set larger than expected.", 2, results.size()); + final int expected = isBroker010() ? 5 : 2; + assertEquals("Result set larger than expected.", expected, results.size()); String log = getLogMessage(results, 0); @@ -185,14 +186,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging String message = getMessageString(fromMessage(log)); assertTrue("Browser not on log message:" + message, message.contains("Browser")); - if(!isBroker010()) - { - assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose")); - } // Beacause it is an auto close and we have no messages on the queue we // will get a close message - log = getLogMessage(results, 1); + log = getLogMessage(results, expected-1); validateMessageID("SUB-1002", log); } diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java index 05c8e362a1..d558e07ed7 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java @@ -67,16 +67,15 @@ public class ConnectionRestTest extends QpidRestTestCase producer.send(_session.createTextMessage("Test-" + i)); } _session.commit(); - Message m = consumer.receive(1000l); - assertNotNull("Message was not received", m); + assertNotNull("First message was not received", m); _session.commit(); // receive the rest of messages for rollback for (int i = 0; i < MESSAGE_NUMBER - 1; i++) { m = consumer.receive(1000l); - assertNotNull("Message was not received", m); + assertNotNull("Subsequent messages were not received", m); } _session.rollback(); @@ -84,7 +83,7 @@ public class ConnectionRestTest extends QpidRestTestCase for (int i = 0; i < MESSAGE_NUMBER - 1; i++) { m = consumer.receive(1000l); - assertNotNull("Message was not received", m); + assertNotNull("Message was not received after rollback", m); } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java index 54c3225cec..b6f81da690 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java @@ -32,6 +32,7 @@ import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -169,7 +170,8 @@ public class RollbackOrderTest extends QpidBrokerTestCase //Start the session now so we _connection.start(); - count.await(); + count.await(10l, TimeUnit.SECONDS); + assertEquals("Not all message received. Count should be 0.", 0, count.getCount()); for (Exception e : exceptions) { |