summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-03 02:14:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-03 02:14:59 +0000
commitfc08306ab1474c3bf4b30942d921a8266e4bd724 (patch)
treebc7ac40adfc2c8ebe1b486fc331b6e481cab1d2d
parenta9b950ac164bb7e2dd05ae44f99d4b728697ad65 (diff)
downloadqpid-python-fc08306ab1474c3bf4b30942d921a8266e4bd724.tar.gz
Updates to subscription
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1563758 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java39
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java10
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java9
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java13
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java25
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java85
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java3
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java344
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java73
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java3
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java189
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java20
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java62
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java26
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java26
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java15
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java8
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java34
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java25
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java27
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java22
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java6
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java4
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java103
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java62
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java582
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java944
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java41
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java87
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java858
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java506
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java6
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java9
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java24
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java25
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java20
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java11
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java255
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java9
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java4
47 files changed, 2181 insertions, 2458 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 6a959df440..ff27c8159e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -39,8 +39,10 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -66,7 +68,7 @@ public abstract class AbstractExchange implements Exchange
private VirtualHost _virtualHost;
- private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +140,12 @@ public abstract class AbstractExchange implements Exchange
if(_closed.compareAndSet(false,true))
{
+ List<Binding> bindings = new ArrayList<Binding>(_bindings);
+ for(Binding binding : bindings)
+ {
+ removeBinding(binding);
+ }
+
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
@@ -145,9 +153,9 @@ public abstract class AbstractExchange implements Exchange
CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
- for(Task task : _closeTaskList)
+ for(Action<Exchange> task : _closeTaskList)
{
- task.onClose(this);
+ task.performAction(this);
}
_closeTaskList.clear();
}
@@ -300,12 +308,12 @@ public abstract class AbstractExchange implements Exchange
return !_referrers.isEmpty();
}
- public void addCloseTask(final Task task)
+ public void addCloseTask(final Action<Exchange> task)
{
_closeTaskList.add(task);
}
- public void removeCloseTask(final Task task)
+ public void removeCloseTask(final Action<Exchange> task)
{
_closeTaskList.remove(task);
}
@@ -421,7 +429,7 @@ public abstract class AbstractExchange implements Exchange
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<QueueEntry> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
@@ -579,8 +587,6 @@ public abstract class AbstractExchange implements Exchange
{
doRemoveBinding(b);
queue.removeBinding(b);
- removeCloseTask(b);
- queue.removeQueueDeleteTask(b);
if (b.isDurable())
{
@@ -659,8 +665,6 @@ public abstract class AbstractExchange implements Exchange
DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
}
- queue.addQueueDeleteTask(b);
- addCloseTask(b);
queue.addBinding(b);
doAddBinding(b);
b.logCreation();
@@ -673,7 +677,7 @@ public abstract class AbstractExchange implements Exchange
}
}
- private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+ private final class BindingImpl extends Binding
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
@@ -689,12 +693,6 @@ public abstract class AbstractExchange implements Exchange
}
-
- public void doTask(final AMQQueue queue) throws AMQException
- {
- removeBinding(this);
- }
-
public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
@@ -729,11 +727,4 @@ public abstract class AbstractExchange implements Exchange
}
- public static interface Task
- {
- public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
- }
-
-
-
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 71d0f8b4dd..db3464c463 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -42,8 +42,10 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -334,7 +336,7 @@ public class DefaultExchange implements Exchange
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<QueueEntry> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 18e912e972..af1eed9032 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -29,7 +29,9 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -105,7 +107,7 @@ public interface Exchange extends ExchangeReferrer
int send(ServerMessage message,
InstanceProperties instanceProperties,
ServerTransaction txn,
- BaseQueue.PostEnqueueAction postEnqueueAction);
+ Action<QueueEntry> postEnqueueAction);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
index 696c59783e..850a6c9d80 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
@@ -47,7 +47,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
queueAdapter.getName(),
subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getConsumerName()), queueAdapter.getTaskExecutor());
+ subscription.getName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
_session = sessionAdapter;
@@ -57,7 +57,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
public String getName()
{
- return _subscription.getConsumerName();
+ return _subscription.getName();
}
public String setName(final String currentName, final String desiredName)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0cddd1ed3b..f6602c8071 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -132,8 +133,8 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
- void addQueueDeleteTask(final Task task);
- void removeQueueDeleteTask(final Task task);
+ void addQueueDeleteTask(Action<AMQQueue> task);
+ void removeQueueDeleteTask(Action<AMQQueue> task);
@@ -271,11 +272,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
}
}
- static interface Task
- {
- public void doTask(AMQQueue queue) throws AMQException;
- }
-
void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index 7aba1a2342..bce2bd67cc 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -24,17 +24,12 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
public interface BaseQueue extends TransactionLogResource
{
- public static interface PostEnqueueAction
- {
- public void onEnqueue(QueueEntry entry);
- }
-
void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
- void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+ void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 2aa1d1f473..212f2f65fa 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -25,6 +25,8 @@ import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -42,11 +44,6 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
}
- public static interface StateChangeListener
- {
- public void stateChanged(QueueEntry entry, State oldSate, State newState);
- }
-
public abstract class EntryState
{
private EntryState()
@@ -198,7 +195,7 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
boolean isRejectedBy(long subscriptionId);
- int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
+ int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn);
boolean isQueueDeleted();
@@ -206,8 +203,8 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
QueueEntry getNextValidEntry();
- void addStateChangeListener(StateChangeListener listener);
- boolean removeStateChangeListener(StateChangeListener listener);
+ void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
/**
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 461d493437..f3a9a9dcc7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -31,6 +31,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.EnumMap;
import java.util.HashSet;
@@ -59,7 +61,7 @@ public abstract class QueueEntryImpl implements QueueEntry
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener> _stateChangeListeners;
+ private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -231,11 +233,6 @@ public abstract class QueueEntryImpl implements QueueEntry
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
- if (subscription != null)
- {
- subscription.releaseQueueEntry(this);
- }
}
if(!getQueue().isDeleted())
@@ -320,8 +317,6 @@ public abstract class QueueEntryImpl implements QueueEntry
if (state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- s = ((SubscriptionAcquiredState) state).getSubscription();
- s.onDequeue(this);
}
getQueue().dequeue(this,s);
@@ -336,7 +331,7 @@ public abstract class QueueEntryImpl implements QueueEntry
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -367,7 +362,7 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+ public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +407,21 @@ public abstract class QueueEntryImpl implements QueueEntry
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 87d11a892e..5f79498beb 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -61,9 +61,13 @@ import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+public class SimpleAMQQueue implements AMQQueue,
+ StateChangeListener<Subscription, Subscription.State>,
+ MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -121,10 +125,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
- private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
- private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
@@ -165,7 +165,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
private LogSubject _logSubject;
@@ -451,7 +451,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (isDeleted())
{
- subscription.queueDeleted(this);
+ subscription.queueDeleted();
}
}
else
@@ -505,7 +505,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
+ subscription.queueDeleted();
}
}
@@ -622,18 +622,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
enqueue(message, null);
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
- enqueue(message, false, action);
- }
-
- public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
- {
-
- if(transactional)
- {
- incrementTxnEnqueueStats(message);
- }
incrementQueueCount();
incrementQueueSize(message);
@@ -715,7 +705,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(action != null)
{
- action.onEnqueue(entry);
+ action.performAction(entry);
}
}
@@ -810,18 +800,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getAtomicQueueCount().incrementAndGet();
}
- private void incrementTxnEnqueueStats(final ServerMessage message)
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
-
- private void incrementTxnDequeueStats(QueueEntry entry)
- {
- _msgTxnDequeues.incrementAndGet();
- _byteTxnDequeues.addAndGet(entry.getSize());
- }
-
private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
throws AMQException
{
@@ -900,11 +878,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_deliveredMessages.decrementAndGet();
}
- if(sub != null && sub.isSessionTransactional())
- {
- incrementTxnDequeueStats(entry);
- }
-
checkCapacity();
}
@@ -1039,7 +1012,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(Subscription sub, Subscription.State oldState, Subscription.State newState)
{
if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
{
@@ -1300,12 +1273,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
});
}
- public void addQueueDeleteTask(final Task task)
+ public void addQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.remove(task);
}
@@ -1322,7 +1295,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (!_deleted.getAndSet(true))
{
- for (Binding b : _bindings)
+ final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+ for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
@@ -1334,7 +1309,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
Subscription s = subscriptionIter.getNode().getSubscription();
if (s != null)
{
- s.queueDeleted(this);
+ s.queueDeleted();
}
}
@@ -1375,9 +1350,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- for (Task task : _deleteTaskList)
+ for (Action<AMQQueue> task : _deleteTaskList)
{
- task.doTask(this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1984,7 +1959,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _notificationChecks;
}
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
private final Subscription _sub;
@@ -2076,26 +2051,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _dequeueSize.get();
}
- public long getByteTxnEnqueues()
- {
- return _byteTxnEnqueues.get();
- }
-
- public long getByteTxnDequeues()
- {
- return _byteTxnDequeues.get();
- }
-
- public long getMsgTxnEnqueues()
- {
- return _msgTxnEnqueues.get();
- }
-
- public long getMsgTxnDequeues()
- {
- return _msgTxnDequeues.get();
- }
-
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index b3566df0c4..6c63b30273 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -48,7 +49,7 @@ public class SortedQueue extends OutOfOrderQueue
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
new file mode 100644
index 0000000000..213d8b7730
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class AbstractSubscription implements Subscription
+{
+ private final long _subscriptionID;
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+ private final Lock _stateChangeLock = new ReentrantLock();
+ private final long _createTime = System.currentTimeMillis();
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final boolean _acquires;
+ private final boolean _seesRequeues;
+ private final String _consumerName;
+ private final boolean _isTransient;
+
+
+ private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
+
+ private final FilterManager _filters;
+
+ private volatile AMQQueue.Context _queueContext;
+
+
+ private StateChangeListener<Subscription, State> _stateListener = new StateChangeListener<Subscription, State>()
+ {
+ public void stateChanged(Subscription sub, State oldState, State newState)
+ {
+ CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ }
+ };
+
+ private SubscriptionLogSubject _logSubject;
+ private AMQQueue _queue;
+ private String _traceExclude;
+ private String _trace;
+ private SubscriptionActor _logActor;
+ private final Class<? extends ServerMessage> _messageClass;
+ private final Object _sessionReference;
+ private boolean _noLocal;
+
+ protected AbstractSubscription(FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final Object sessionReference,
+ final boolean acquires,
+ final boolean seesRequeues,
+ final String consumerName, final boolean isTransient)
+ {
+ _messageClass = messageClass;
+ _sessionReference = sessionReference;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
+ _filters = filters;
+ _acquires = acquires;
+ _seesRequeues = seesRequeues;
+ _consumerName = consumerName;
+ _isTransient = isTransient;
+ }
+
+ public final long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
+
+ public final StateChangeListener<Subscription, State> getStateListener()
+ {
+ return _stateListener;
+ }
+
+ public final void setStateListener(StateChangeListener<Subscription, State> listener)
+ {
+ _stateListener = listener;
+ }
+
+
+ public final AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public final void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ protected boolean updateState(State from, State to)
+ {
+ return _state.compareAndSet(from, to);
+ }
+
+ public final boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public final boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+
+ public final void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+
+ public final boolean hasInterest(QueueEntry entry)
+ {
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(getSubscriptionID()))
+ {
+
+ return false;
+ }
+
+ if (entry.getMessage().getClass() == _messageClass)
+ {
+ if(_noLocal)
+ {
+ Object connectionRef = entry.getMessage().getConnectionReference();
+ if (connectionRef != null && connectionRef == _sessionReference)
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ // no interest in messages we can't convert
+ if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
+ {
+ return false;
+ }
+ }
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
+ }
+
+
+ protected String getFilterLogString()
+ {
+ StringBuilder filterLogString = new StringBuilder();
+ String delimiter = ", ";
+ boolean hasEntries = false;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString.append(_filters.toString());
+ hasEntries = true;
+ }
+
+ if (!acquires())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Browser");
+ hasEntries = true;
+ }
+
+ return filterLogString.toString();
+ }
+
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public final void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public final void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public final AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public final void setQueue(AMQQueue queue, boolean exclusive)
+ {
+ if(getQueue() != null)
+ {
+ throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+ }
+ _queue = queue;
+
+ _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
+ _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
+
+ _logSubject = new SubscriptionLogSubject(this);
+ _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
+
+ if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY))
+ {
+ final String filterLogString = getFilterLogString();
+ CurrentActor.get().message(_logSubject, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
+ filterLogString.length() > 0));
+ }
+ }
+
+ protected final String getTraceExclude()
+ {
+ return _traceExclude;
+ }
+
+ protected final String getTrace()
+ {
+ return _trace;
+ }
+
+ protected final LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ public final LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
+ public final long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public final QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+
+ public final void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public final Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+ public final boolean acquires()
+ {
+ return _acquires;
+ }
+
+ public final boolean seesRequeues()
+ {
+ return _seesRequeues;
+ }
+
+ public final String getName()
+ {
+ return _consumerName;
+ }
+
+ public final boolean isTransient()
+ {
+ return _isTransient;
+ }
+
+
+ public final long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public final long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+ public final void send(final QueueEntry entry, final boolean batch) throws AMQException
+ {
+ _deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getMessage().getSize());
+ doSend(entry, batch);
+ }
+
+ protected abstract void doSend(final QueueEntry entry, final boolean batch) throws AMQException;
+
+ @Override
+ public final void flush() throws AMQException
+ {
+ getQueue().flushSubscription(this);
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
new file mode 100644
index 0000000000..4d9550907e
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractSubscriptionTarget implements SubscriptionTarget
+{
+
+ private final AtomicReference<State> _state;
+ private final AtomicReference<StateChangeListener<SubscriptionTarget, State>> _stateListener =
+ new AtomicReference<StateChangeListener<SubscriptionTarget, State>>();
+
+ protected AbstractSubscriptionTarget(final State initialState)
+ {
+ _state = new AtomicReference<State>(initialState);
+ }
+
+
+ public final State getState()
+ {
+ return _state.get();
+ }
+
+ protected final boolean updateState(State from, State to)
+ {
+ if(_state.compareAndSet(from, to))
+ {
+ StateChangeListener<SubscriptionTarget, State> listener = _stateListener.get();
+ if(listener != null)
+ {
+ listener.stateChanged(this, from, to);
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+ public final void setStateListener(StateChangeListener<SubscriptionTarget, State> listener)
+ {
+ _stateListener.set(listener);
+ }
+
+ public final StateChangeListener<SubscriptionTarget, State> getStateListener()
+ {
+ return _stateListener.get();
+ }
+
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index 55110c46de..9b5e4fd10a 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,7 +241,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return groupVal;
}
- private class GroupStateChangeListener implements QueueEntry.StateChangeListener
+ private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
private final Group _group;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
new file mode 100644
index 0000000000..623371c84c
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.EnumMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DelegatingSubscription<T extends SubscriptionTarget> extends AbstractSubscription
+{
+ private static final Logger _logger = Logger.getLogger(DelegatingSubscription.class);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ static final EnumMap<SubscriptionTarget.State, State> STATE_MAP =
+ new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class);
+
+ static
+ {
+ STATE_MAP.put(SubscriptionTarget.State.ACTIVE, State.ACTIVE);
+ STATE_MAP.put(SubscriptionTarget.State.SUSPENDED, State.SUSPENDED);
+ STATE_MAP.put(SubscriptionTarget.State.CLOSED, State.CLOSED);
+ }
+
+ private final T _target;
+
+ public DelegatingSubscription(final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final boolean acquires,
+ final boolean seesRequeues,
+ final String consumerName,
+ final boolean isTransient,
+ T target)
+ {
+ super(filters, messageClass, target.getSessionModel().getConnectionReference(),
+ acquires, seesRequeues, consumerName, isTransient);
+ _target = target;
+ _target.setStateListener(
+ new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>()
+ {
+ @Override
+ public void stateChanged(final SubscriptionTarget object,
+ final SubscriptionTarget.State oldState,
+ final SubscriptionTarget.State newState)
+ {
+ targetStateChanged(oldState, newState);
+ }
+ });
+ }
+
+ private void targetStateChanged(final SubscriptionTarget.State oldState, final SubscriptionTarget.State newState)
+ {
+ if(oldState != newState)
+ {
+ if(newState == SubscriptionTarget.State.CLOSED)
+ {
+ if(_closed.compareAndSet(false,true))
+ {
+ CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
+ }
+ }
+ else
+ {
+ CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString()));
+ }
+ }
+
+ if(newState == SubscriptionTarget.State.CLOSED && oldState != newState)
+ {
+ try
+ {
+ getQueue().unregisterSubscription(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to remove to remove subscription", e);
+ throw new RuntimeException(e);
+ }
+ }
+ final StateChangeListener<Subscription, State> stateListener = getStateListener();
+ if(stateListener != null)
+ {
+ stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
+ }
+ }
+
+ public T getTarget()
+ {
+ return _target;
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ return _target.getUnacknowledgedBytes();
+ }
+
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ return _target.getUnacknowledgedMessages();
+ }
+
+ @Override
+ public AMQSessionModel getSessionModel()
+ {
+ return _target.getSessionModel();
+ }
+
+ @Override
+ public boolean isSuspended()
+ {
+ return _target.isSuspended();
+ }
+
+ @Override
+ public void close()
+ {
+ _target.close();
+ }
+
+ @Override
+ protected void doSend(final QueueEntry entry, final boolean batch) throws AMQException
+ {
+ _target.send(entry, batch);
+ }
+
+ @Override
+ public void flushBatched()
+ {
+ _target.flushBatched();
+ }
+
+ @Override
+ public void queueDeleted()
+ {
+ _target.queueDeleted();
+ }
+
+ @Override
+ public boolean wouldSuspend(final QueueEntry msg)
+ {
+ return !_target.allocateCredit(msg);
+ }
+
+ @Override
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ _target.restoreCredit(queueEntry);
+ }
+
+ @Override
+ public void queueEmpty() throws AMQException
+ {
+ _target.queueEmpty();
+ }
+
+ @Override
+ public State getState()
+ {
+ return STATE_MAP.get(_target.getState());
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index fde3d3809c..f9278cf719 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
public interface Subscription
{
@@ -50,11 +51,6 @@ public interface Subscription
CLOSED
}
- public static interface StateListener
- {
- public void stateChange(Subscription sub, State oldState, State newState);
- }
-
AMQQueue getQueue();
AMQSessionModel getSessionModel();
@@ -82,7 +78,7 @@ public interface Subscription
void flushBatched();
- void queueDeleted(AMQQueue queue);
+ void queueDeleted();
boolean wouldSuspend(QueueEntry msg);
@@ -94,13 +90,9 @@ public interface Subscription
void releaseSendLock();
- void releaseQueueEntry(final QueueEntry queueEntryImpl);
-
- void onDequeue(final QueueEntry queueEntry);
-
void restoreCredit(final QueueEntry queueEntry);
- void setStateListener(final StateListener listener);
+ void setStateListener(final StateChangeListener<Subscription, State> listener);
public State getState();
@@ -115,9 +107,9 @@ public interface Subscription
public Object get(String key);
- boolean isSessionTransactional();
-
void queueEmpty() throws AMQException;
- String getConsumerName();
+ String getName();
+
+ void flush() throws AMQException;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
new file mode 100644
index 0000000000..0b0be38f42
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+public interface SubscriptionTarget
+{
+
+
+ enum State
+ {
+ ACTIVE, SUSPENDED, CLOSED
+ }
+
+ State getState();
+
+ void setStateListener(StateChangeListener<SubscriptionTarget, State> listener);
+
+ long getUnacknowledgedBytes();
+
+ long getUnacknowledgedMessages();
+
+ AMQSessionModel getSessionModel();
+
+ void send(QueueEntry entry, boolean batch) throws AMQException;
+
+ void flushBatched();
+
+ void queueDeleted();
+
+ void queueEmpty() throws AMQException;
+
+ boolean allocateCredit(QueueEntry msg);
+
+ void restoreCredit(QueueEntry queueEntry);
+
+ boolean isSuspended();
+
+ boolean close();
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
new file mode 100644
index 0000000000..0d53b4d03b
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface Action<T>
+{
+ void performAction(T object);
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java b/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
new file mode 100644
index 0000000000..b5dc90cfb6
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface StateChangeListener<T, E extends Enum>
+{
+ void stateChanged(T object, E oldState, E newState);
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index b7d3cf872b..b01f1d1ebc 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -141,7 +141,7 @@ public class VirtualHostConfigRecoveryHandler implements
try
{
- queue.enqueue(message, true, null);
+ queue.enqueue(message, null);
ref.release();
}
catch (AMQException e)
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index ea2e29d40d..98ecdcdd3b 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -297,23 +298,15 @@ public class MockAMQQueue implements AMQQueue
{
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
}
- public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
- {
- }
-
public void requeue(QueueEntry entry)
{
}
- public void requeue(QueueEntryImpl storeContext, Subscription subscription)
- {
- }
-
public void dequeue(QueueEntry entry, Subscription sub)
{
}
@@ -323,11 +316,11 @@ public class MockAMQQueue implements AMQQueue
return false;
}
- public void addQueueDeleteTask(Task task)
+ public void addQueueDeleteTask(Action<AMQQueue> task)
{
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index d3c866f747..de5bbc2347 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
public class MockQueueEntry implements QueueEntry
{
@@ -53,7 +55,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
}
@@ -63,7 +65,7 @@ public class MockQueueEntry implements QueueEntry
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn)
+ public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn)
{
return 0;
}
@@ -137,7 +139,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
return false;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index b0e5a510b8..ffd64774c0 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -41,10 +41,10 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -228,9 +228,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.registerSubscription(_subscription, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -276,9 +276,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.registerSubscription(_subscription, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -323,9 +323,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.registerSubscription(_subscription, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -376,9 +376,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.registerSubscription(subscription2, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -1011,37 +1011,37 @@ public class SimpleAMQQueueTest extends QpidTestCase
//verify behaviour in face of expected state changes:
//verify a subscription going suspended->active increases the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
//verify a subscription going active->suspended decreases the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going suspended->closed doesn't change the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+ queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going active->closed decreases the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
+ queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
//verify behaviour in face of unexpected state changes:
//verify a subscription going closed->active increases the count
- queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+ queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going active->active doesn't change the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going closed->suspended doesn't change the count
- queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
+ queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going suspended->suspended doesn't change the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+ queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 8d1b27e272..eec1edca35 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.ArrayList;
import java.util.List;
@@ -47,7 +48,7 @@ public class MockSubscription implements Subscription
private boolean _closed = false;
private String tag = "mocktag";
private AMQQueue queue = null;
- private StateListener _listener = null;
+ private StateChangeListener<Subscription, State> _listener = null;
private volatile AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
@@ -75,16 +76,22 @@ public class MockSubscription implements Subscription
_closed = true;
if (_listener != null)
{
- _listener.stateChange(this, _state, State.CLOSED);
+ _listener.stateChanged(this, _state, State.CLOSED);
}
_state = State.CLOSED;
}
- public String getConsumerName()
+ public String getName()
{
return tag;
}
+ @Override
+ public void flush() throws AMQException
+ {
+
+ }
+
public long getSubscriptionID()
{
return _subscriptionID;
@@ -202,7 +209,7 @@ public class MockSubscription implements Subscription
return false;
}
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted()
{
}
@@ -211,18 +218,10 @@ public class MockSubscription implements Subscription
_stateChangeLock.unlock();
}
- public void onDequeue(QueueEntry queueEntry)
- {
- }
-
public void restoreCredit(QueueEntry queueEntry)
{
}
- public void releaseQueueEntry(QueueEntry queueEntry)
- {
- }
-
public void send(QueueEntry entry, boolean batch) throws AMQException
{
if (messages.contains(entry))
@@ -251,7 +250,7 @@ public class MockSubscription implements Subscription
{
}
- public void setStateListener(StateListener listener)
+ public void setStateListener(StateChangeListener<Subscription, State> listener)
{
this._listener = listener;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 4b38b8a1a3..04510064de 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -31,20 +31,19 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final QueueEntry _entry;
- private final Subscription_0_10 _sub;
+ private final SubscriptionTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ExplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
{
- subscription.getSessionModel().acknowledge(subscription, _entry);
+ _target.getSessionModel().acknowledge(_target, _entry);
}
else
{
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
{
- subscription.release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
{
- subscription.reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- return _entry.acquire(getSubscription());
+ return _entry.acquire(_target.getSubscription());
}
- private Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index ce0155b789..0cdced728a 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -30,12 +30,12 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final QueueEntry _entry;
- private Subscription_0_10 _sub;
+ private SubscriptionTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ImplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getSubscription()))
{
- getSubscription().release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getSubscription()))
{
- getSubscription().reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- boolean acquired = _entry.acquire(getSubscription());
+ boolean acquired = _entry.acquire(_target.getSubscription());
if(acquired)
{
- getSubscription().recordUnacknowledged(_entry);
+ _target.recordUnacknowledged(_entry);
}
return acquired;
}
- public Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index f5f2a8d43f..34a7f2e526 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -26,12 +26,12 @@ import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final Subscription_0_10 _sub;
+ private final SubscriptionTarget_0_10 _sub;
private final QueueEntry _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -46,7 +46,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
_sub.restoreCredit(_entry);
}
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_sub.getSubscription()))
{
_session.acknowledge(_sub, _entry);
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index a15fea1200..6348510b09 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate
private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subs)
+ final Collection<SubscriptionTarget_0_10> subs = ssn.getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index bae5616042..6a5c69fed0 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -55,15 +55,14 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -77,6 +76,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -104,10 +104,10 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
+ private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>()
{
@Override
- public void onEnqueue(final QueueEntry entry)
+ public void performAction(final QueueEntry entry)
{
entry.getQueue().checkCapacity(ServerSession.this);
}
@@ -126,12 +126,6 @@ public class ServerSession extends Session
}
- public static interface Task
- {
- public void doTask(ServerSession session);
- }
-
-
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
@@ -142,9 +136,9 @@ public class ServerSession extends Session
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+ private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>();
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -386,9 +380,9 @@ public class ServerSession extends Session
}
_messageDispositionListenerMap.clear();
- for (Task task : _taskList)
+ for (Action<ServerSession> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -405,7 +399,7 @@ public class ServerSession extends Session
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ public void acknowledge(final SubscriptionTarget_0_10 sub, final QueueEntry entry)
{
_transaction.dequeue(entry.getQueue(), entry.getMessage(),
new ServerTransaction.Action()
@@ -426,37 +420,28 @@ public class ServerSession extends Session
});
}
- public Collection<Subscription_0_10> getSubscriptions()
+ public Collection<SubscriptionTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, Subscription_0_10 sub)
+ public void register(String destination, SubscriptionTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public Subscription_0_10 getSubscription(String destination)
+ public SubscriptionTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(Subscription_0_10 sub)
+ public void unregister(SubscriptionTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
try
{
sub.getSendLock();
- AMQQueue queue = sub.getQueue();
- if(queue != null)
- {
- queue.unregisterSubscription(sub);
- }
- }
- catch (AMQException e)
- {
- // TODO
- _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
+ sub.close();
}
finally
{
@@ -638,12 +623,12 @@ public class ServerSession extends Session
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Task task)
+ public void addSessionCloseTask(Action<ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeSessionCloseTask(Action<ServerSession> task)
{
_taskList.remove(task);
}
@@ -829,8 +814,8 @@ public class ServerSession extends Session
void unregisterSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -838,8 +823,8 @@ public class ServerSession extends Session
void stopSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -848,58 +833,14 @@ public class ServerSession extends Session
public void receivedComplete()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
awaitCommandCompletion();
}
- private class PostEnqueueAction implements ServerTransaction.Action
- {
-
- private final MessageReference<MessageTransferMessage> _reference;
- private final List<? extends BaseQueue> _queues;
- private final boolean _transactional;
-
- public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional)
- {
- _reference = message.newReference();
- _transactional = transactional;
- _queues = queues;
- }
-
- public void postCommit()
- {
- for(int i = 0; i < _queues.size(); i++)
- {
- try
- {
- BaseQueue queue = _queues.get(i);
- queue.enqueue(_reference.getMessage(), _transactional, null);
- if(queue instanceof AMQQueue)
- {
- ((AMQQueue)queue).checkCapacity(ServerSession.this);
- }
-
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- // NO-OP
- _reference.release();
- }
- }
-
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index dcca696529..d3480e3223 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -45,6 +45,8 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -55,6 +57,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -214,9 +217,9 @@ public class ServerSessionDelegate extends SessionDelegate
ServerSession s = (ServerSession) session;
queue.setExclusiveOwningSession(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getExclusiveOwningSession() == session)
{
@@ -228,9 +231,9 @@ public class ServerSessionDelegate extends SessionDelegate
if(queue.getAuthorizationHolder() == null)
{
queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getAuthorizationHolder() == session)
{
@@ -254,16 +257,21 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
+ SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ filterManager,
+ method.getArguments());
- ((ServerSession)session).register(destination, sub);
+ Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class,
+ method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED,
+ method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target);
+
+ target.setSubscription(sub);
+
+ ((ServerSession)session).register(destination, target);
try
{
queue.registerSubscription(sub, method.getExclusive());
@@ -385,7 +393,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -393,7 +401,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- AMQQueue queue = sub.getQueue();
+ AMQQueue queue = sub.getSubscription().getQueue();
((ServerSession)session).unregister(sub);
if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
{
@@ -407,7 +415,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1249,9 +1257,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (autoDelete && exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
try
{
@@ -1265,9 +1273,9 @@ public class ServerSessionDelegate extends SessionDelegate
};
final ServerSession s = (ServerSession) session;
s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(deleteQueueTask);
}
@@ -1276,9 +1284,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
+ final Action<ServerSession> removeExclusive = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
q.setAuthorizationHolder(null);
q.setExclusiveOwningSession(null);
@@ -1287,9 +1295,9 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerSession s = (ServerSession) session;
q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(removeExclusive);
}
@@ -1461,7 +1469,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = sfm.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1478,7 +1486,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = stop.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1496,7 +1504,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = flow.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
new file mode 100644
index 0000000000..c151eddb46
--- /dev/null
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
@@ -0,0 +1,582 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.*;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+{
+
+ private static final Option[] BATCHED = new Option[] { Option.BATCH };
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+ private FlowCreditManager_0_10 _creditManager;
+
+ private final MessageAcceptMode _acceptMode;
+ private final MessageAcquireMode _acquireMode;
+ private MessageFlowMode _flowMode;
+ private final ServerSession _session;
+ private final AtomicBoolean _stopped = new AtomicBoolean(true);
+ private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
+ private final Map<String, Object> _arguments;
+ private int _deferredMessageCredit;
+ private long _deferredSizeCredit;
+ private Subscription _subscription;
+
+
+ public SubscriptionTarget_0_10(ServerSession session,
+ String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ FilterManager filters,
+ Map<String, Object> arguments)
+ {
+ super(State.SUSPENDED);
+ _session = session;
+ _postIdSettingAction = new AddMessageDispositionListenerAction(session);
+ _acceptMode = acceptMode;
+ _acquireMode = acquireMode;
+ _creditManager = creditManager;
+ _flowMode = flowMode;
+ _creditManager.addStateListener(this);
+ _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
+ Collections.<String, Object> unmodifiableMap(arguments);
+
+ }
+
+
+ public void setSubscription(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ public boolean isSuspended()
+ {
+ return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
+ }
+
+ public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ getSubscription().getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ _creditManager.removeListener(this);
+ }
+ finally
+ {
+ getSubscription().releaseSendLock();
+ }
+
+ return closed;
+
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(!updateState(State.SUSPENDED, State.ACTIVE))
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ public String getName()
+ {
+ return getSubscription().getName();
+ }
+
+ public void getSendLock()
+ {
+ getSubscription().getSendLock();
+ }
+
+ public void releaseSendLock()
+ {
+ getSubscription().releaseSendLock();
+ }
+
+
+ public static class AddMessageDispositionListenerAction implements Runnable
+ {
+ private MessageTransfer _xfr;
+ private ServerSession.MessageDispositionChangeListener _action;
+ private ServerSession _session;
+
+ public AddMessageDispositionListenerAction(ServerSession session)
+ {
+ _session = session;
+ }
+
+ public void setXfr(MessageTransfer xfr)
+ {
+ _xfr = xfr;
+ }
+
+ public void setAction(ServerSession.MessageDispositionChangeListener action)
+ {
+ _action = action;
+ }
+
+ public void run()
+ {
+ if(_action != null)
+ {
+ _session.onMessageDispositionChange(_xfr, _action);
+ }
+ }
+ }
+
+ private final AddMessageDispositionListenerAction _postIdSettingAction;
+
+ public void send(final QueueEntry entry, boolean batch) throws AMQException
+ {
+ ServerMessage serverMsg = entry.getMessage();
+
+
+ MessageTransfer xfr;
+
+ DeliveryProperties deliveryProps;
+ MessageProperties messageProps = null;
+
+ MessageTransferMessage msg;
+
+ if(serverMsg instanceof MessageTransferMessage)
+ {
+
+ msg = (MessageTransferMessage) serverMsg;
+
+ }
+ else
+ {
+ MessageConverter converter =
+ MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
+
+
+ msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost());
+ }
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
+
+ deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+ if(origDeliveryProps.hasTimestamp())
+ {
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
+ }
+ if(origDeliveryProps.hasTtl())
+ {
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
+ }
+
+
+ }
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
+
+ xfr = batch ? new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
+ {
+ public void onComplete(Method method)
+ {
+ deferredAddCredit(1, entry.getSize());
+ }
+ });
+ }
+
+
+ _postIdSettingAction.setXfr(xfr);
+ if(_acceptMode == MessageAcceptMode.EXPLICIT)
+ {
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+ }
+ else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+ }
+ else
+ {
+ _postIdSettingAction.setAction(null);
+ }
+
+
+ _session.sendMessage(xfr, _postIdSettingAction);
+ entry.incrementDeliveryCount();
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ forceDequeue(entry, false);
+ }
+ else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ recordUnacknowledged(entry);
+ }
+
+ }
+
+ void recordUnacknowledged(QueueEntry entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getSize());
+ }
+
+ private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
+ {
+ _deferredMessageCredit += deferredMessageCredit;
+ _deferredSizeCredit += deferredSizeCredit;
+
+ }
+
+ public void flushCreditState(boolean strict)
+ {
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ {
+ _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ _deferredMessageCredit = 0;
+ _deferredSizeCredit = 0l;
+ }
+ }
+
+ private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ {
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
+ dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if (restoreCredit)
+ {
+ restoreCredit(entry);
+ }
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
+ void reject(final QueueEntry entry)
+ {
+ entry.setRedelivered();
+ entry.routeToAlternate(null, null);
+ if(entry.isAcquiredBy(getSubscription()))
+ {
+ entry.delete();
+ }
+ }
+
+ void release(final QueueEntry entry, final boolean setRedelivered)
+ {
+ if (setRedelivered)
+ {
+ entry.setRedelivered();
+ }
+
+ if (getSessionModel().isClosing() || !setRedelivered)
+ {
+ entry.decrementDeliveryCount();
+ }
+
+ if (isMaxDeliveryLimitReached(entry))
+ {
+ sendToDLQOrDiscard(entry);
+ }
+ else
+ {
+ entry.release();
+ }
+ }
+
+ protected void sendToDLQOrDiscard(QueueEntry entry)
+ {
+ final LogActor logActor = CurrentActor.get();
+ final ServerMessage msg = entry.getMessage();
+
+ int requeues = entry.routeToAlternate(new Action<QueueEntry>()
+ {
+ @Override
+ public void performAction(final QueueEntry requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
+ {
+ final AMQQueue queue = entry.getQueue();
+ final Exchange alternateExchange = queue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
+ }
+ else
+ {
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
+ }
+ }
+ }
+
+ private boolean isMaxDeliveryLimitReached(QueueEntry entry)
+ {
+ final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
+ return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
+ }
+
+ public void queueDeleted()
+ {
+ _deleted.set(true);
+ }
+
+ public boolean allocateCredit(QueueEntry entry)
+ {
+ return _creditManager.useCreditForMessage(entry.getMessage().getSize());
+ }
+
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ _creditManager.restoreCredit(1, queueEntry.getSize());
+ }
+
+ public FlowCreditManager_0_10 getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public void stop()
+ {
+ try
+ {
+ getSubscription().getSendLock();
+
+ updateState(State.ACTIVE, State.SUSPENDED);
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
+ }
+ finally
+ {
+ getSubscription().releaseSendLock();
+ }
+ }
+
+ public void addCredit(MessageCreditUnit unit, long value)
+ {
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+
+ switch (unit)
+ {
+ case MESSAGE:
+
+ creditManager.addCredit(value, 0L);
+ break;
+ case BYTE:
+ creditManager.addCredit(0l, value);
+ break;
+ }
+
+ _stopped.set(false);
+
+ if(creditManager.hasCredit())
+ {
+ updateState(State.SUSPENDED, State.ACTIVE);
+ }
+
+ }
+
+ public void setFlowMode(MessageFlowMode flowMode)
+ {
+
+
+ _creditManager.removeListener(this);
+
+ switch(flowMode)
+ {
+ case CREDIT:
+ _creditManager = new CreditCreditManager(0l,0l);
+ break;
+ case WINDOW:
+ _creditManager = new WindowCreditManager(0l,0l);
+ break;
+ default:
+ throw new RuntimeException("Unknown message flow mode: " + flowMode);
+ }
+ _flowMode = flowMode;
+ updateState(State.ACTIVE, State.SUSPENDED);
+
+ _creditManager.addStateListener(this);
+
+ }
+
+ public boolean isStopped()
+ {
+ return _stopped.get();
+ }
+
+ public void acknowledge(QueueEntry entry)
+ {
+ // TODO Fix Store Context / cleanup
+ if(entry.isAcquiredBy(getSubscription()))
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getSize());
+ _unacknowledgedCount.decrementAndGet();
+ entry.delete();
+ }
+ }
+
+ public void flush() throws AMQException
+ {
+ flushCreditState(true);
+ getSubscription().flush();
+ stop();
+ }
+
+ public ServerSession getSessionModel()
+ {
+ return _session;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return _arguments;
+ }
+
+ public void queueEmpty()
+ {
+ }
+
+ public void flushBatched()
+ {
+ _session.getConnection().flush();
+ }
+
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
deleted file mode 100644
index 357b565365..0000000000
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ /dev/null
@@ -1,944 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
-{
- private final long _subscriptionID;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
-
- private static final Option[] BATCHED = new Option[] { Option.BATCH };
-
- private final Lock _stateChangeLock = new ReentrantLock();
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private volatile AMQQueue.Context _queueContext;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
- private FlowCreditManager_0_10 _creditManager;
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
- }
- };
- private AMQQueue _queue;
- private final String _destination;
- private boolean _noLocal;
- private final FilterManager _filters;
- private final MessageAcceptMode _acceptMode;
- private final MessageAcquireMode _acquireMode;
- private MessageFlowMode _flowMode;
- private final ServerSession _session;
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
-
- private LogActor _logActor;
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private String _traceExclude;
- private String _trace;
- private final long _createTime = System.currentTimeMillis();
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
- private final Map<String, Object> _arguments;
- private int _deferredMessageCredit;
- private long _deferredSizeCredit;
-
-
- public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- MessageFlowMode flowMode,
- FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments)
- {
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
- _session = session;
- _postIdSettingAction = new AddMessageDispositionListenerAction(session);
- _destination = destination;
- _acceptMode = acceptMode;
- _acquireMode = acquireMode;
- _creditManager = creditManager;
- _flowMode = flowMode;
- _filters = filters;
- _creditManager.addStateListener(this);
- _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
- Collections.<String, Object> unmodifiableMap(arguments);
- _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
-
- }
-
- public void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(AMQQueue queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
- _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
- String filterLogString = null;
-
- _logActor = GenericActor.getInstance(this);
- if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- filterLogString = getFilterLogString();
- CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
- filterLogString.length() > 0));
- }
- }
-
- public String getConsumerName()
- {
- return _destination;
- }
-
- public boolean isSuspended()
- {
- return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
-
-
-
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
- {
-
- return false;
- }
-
- if (entry.getMessage() instanceof MessageTransferMessage)
- {
- if(_noLocal)
- {
- Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
- if (connectionRef != null && connectionRef == _session.getReference())
- {
- return false;
- }
- }
- }
- else
- {
- // no interest in messages we can't convert
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null)
- {
- return false;
- }
- }
-
-
- return checkFilters(entry);
-
-
- }
-
- private boolean checkFilters(QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
- public boolean isBrowser()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean seesRequeues()
- {
- return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- finally
- {
- _stateChangeLock.unlock();
- }
-
-
-
- }
-
- public Long getDelivered()
- {
- return _deliveredCount.get();
- }
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
-
- public static class AddMessageDispositionListenerAction implements Runnable
- {
- private MessageTransfer _xfr;
- private ServerSession.MessageDispositionChangeListener _action;
- private ServerSession _session;
-
- public AddMessageDispositionListenerAction(ServerSession session)
- {
- _session = session;
- }
-
- public void setXfr(MessageTransfer xfr)
- {
- _xfr = xfr;
- }
-
- public void setAction(ServerSession.MessageDispositionChangeListener action)
- {
- _action = action;
- }
-
- public void run()
- {
- if(_action != null)
- {
- _session.onMessageDispositionChange(_xfr, _action);
- }
- }
- }
-
- private final AddMessageDispositionListenerAction _postIdSettingAction;
-
- public void send(final QueueEntry entry, boolean batch) throws AMQException
- {
- ServerMessage serverMsg = entry.getMessage();
-
-
- MessageTransfer xfr;
-
- DeliveryProperties deliveryProps;
- MessageProperties messageProps = null;
-
- MessageTransferMessage msg;
-
- if(serverMsg instanceof MessageTransferMessage)
- {
-
- msg = (MessageTransferMessage) serverMsg;
-
- }
- else
- {
- MessageConverter converter =
- MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
-
-
- msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
- }
- DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
- messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
- deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
- {
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
- }
- if(origDeliveryProps.hasExchange())
- {
- deliveryProps.setExchange(origDeliveryProps.getExchange());
- }
- if(origDeliveryProps.hasExpiration())
- {
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
- }
- if(origDeliveryProps.hasPriority())
- {
- deliveryProps.setPriority(origDeliveryProps.getPriority());
- }
- if(origDeliveryProps.hasRoutingKey())
- {
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
- }
- if(origDeliveryProps.hasTimestamp())
- {
- deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
- }
- if(origDeliveryProps.hasTtl())
- {
- deliveryProps.setTtl(origDeliveryProps.getTtl());
- }
-
-
- }
-
- deliveryProps.setRedelivered(entry.isRedelivered());
-
- if(_trace != null && messageProps == null)
- {
- messageProps = new MessageProperties();
- }
-
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
-
- boolean excludeDueToFederation = false;
-
- if(_trace != null)
- {
- if(!messageProps.hasApplicationHeaders())
- {
- messageProps.setApplicationHeaders(new HashMap<String,Object>());
- }
- Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
- String trace = (String) appHeaders.get("x-qpid.trace");
- if(trace == null)
- {
- trace = _trace;
- }
- else
- {
- if(_traceExclude != null)
- {
- excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
- }
- trace+=","+_trace;
- }
- appHeaders.put("x-qpid.trace",trace);
- }
-
- if(!excludeDueToFederation)
- {
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
- }
- else if(_flowMode == MessageFlowMode.WINDOW)
- {
- xfr.setCompletionListener(new Method.CompletionListener()
- {
- public void onComplete(Method method)
- {
- deferredAddCredit(1, entry.getSize());
- }
- });
- }
-
-
- _postIdSettingAction.setXfr(xfr);
- if(_acceptMode == MessageAcceptMode.EXPLICIT)
- {
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
- }
- else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
- }
- else
- {
- _postIdSettingAction.setAction(null);
- }
-
-
- _session.sendMessage(xfr, _postIdSettingAction);
- entry.incrementDeliveryCount();
- _deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getSize());
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- forceDequeue(entry, false);
- }
- else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- recordUnacknowledged(entry);
- }
- }
- else
- {
- forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
-
- }
- }
-
- void recordUnacknowledged(QueueEntry entry)
- {
- _unacknowledgedCount.incrementAndGet();
- _unacknowledgedBytes.addAndGet(entry.getSize());
- }
-
- private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
- {
- _deferredMessageCredit += deferredMessageCredit;
- _deferredSizeCredit += deferredSizeCredit;
-
- }
-
- public void flushCreditState(boolean strict)
- {
- if(strict || !isSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
- {
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
- _deferredMessageCredit = 0;
- _deferredSizeCredit = 0l;
- }
- }
-
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
- {
- AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- if (restoreCredit)
- {
- restoreCredit(entry);
- }
- entry.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
- void reject(final QueueEntry entry)
- {
- entry.setRedelivered();
- entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(this))
- {
- entry.delete();
- }
- }
-
- void release(final QueueEntry entry, final boolean setRedelivered)
- {
- if (setRedelivered)
- {
- entry.setRedelivered();
- }
-
- if (getSessionModel().isClosing() || !setRedelivered)
- {
- entry.decrementDeliveryCount();
- }
-
- if (isMaxDeliveryLimitReached(entry))
- {
- sendToDLQOrDiscard(entry);
- }
- else
- {
- entry.release();
- }
- }
-
- protected void sendToDLQOrDiscard(QueueEntry entry)
- {
- final LogActor logActor = CurrentActor.get();
- final ServerMessage msg = entry.getMessage();
-
- int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
- {
- @Override
- public void onEnqueue(final QueueEntry requeueEntry)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
- }
- }, null);
-
- if (requeues == 0)
- {
- final AMQQueue queue = entry.getQueue();
- final Exchange alternateExchange = queue.getAlternateExchange();
-
- if(alternateExchange != null)
- {
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- alternateExchange.getName()));
- }
- else
- {
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
- queue.getName(),
- msg.getRoutingKey()));
- }
- }
- }
-
- private boolean isMaxDeliveryLimitReached(QueueEntry entry)
- {
- final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
- return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
- }
-
- public boolean wouldSuspend(QueueEntry entry)
- {
- return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void restoreCredit(QueueEntry queueEntry)
- {
- _creditManager.restoreCredit(1, queueEntry.getSize());
- }
-
- public void onDequeue(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void releaseQueueEntry(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void setStateListener(StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
-
- public FlowCreditManager_0_10 getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void stop()
- {
- try
- {
- getSendLock();
-
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
- {
- releaseSendLock();
- }
- }
-
- public void addCredit(MessageCreditUnit unit, long value)
- {
- FlowCreditManager_0_10 creditManager = getCreditManager();
-
- switch (unit)
- {
- case MESSAGE:
-
- creditManager.addCredit(value, 0L);
- break;
- case BYTE:
- creditManager.addCredit(0l, value);
- break;
- }
-
- _stopped.set(false);
-
- if(creditManager.hasCredit())
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- }
-
- }
-
- public void setFlowMode(MessageFlowMode flowMode)
- {
-
-
- _creditManager.removeListener(this);
-
- switch(flowMode)
- {
- case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
- break;
- case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
- break;
- default:
- throw new RuntimeException("Unknown message flow mode: " + flowMode);
- }
- _flowMode = flowMode;
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
-
- _creditManager.addStateListener(this);
-
- }
-
- public boolean isStopped()
- {
- return _stopped.get();
- }
-
- public boolean acquires()
- {
- return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
- }
-
- public void acknowledge(QueueEntry entry)
- {
- // TODO Fix Store Context / cleanup
- if(entry.isAcquiredBy(this))
- {
- _unacknowledgedBytes.addAndGet(-entry.getSize());
- _unacknowledgedCount.decrementAndGet();
- entry.delete();
- }
- }
-
- public void flush() throws AMQException
- {
- flushCreditState(true);
- _queue.flushSubscription(this);
- stop();
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public ServerSession getSessionModel()
- {
- return _session;
- }
-
- public boolean isBrowsing()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean isExclusive()
- {
- return getQueue().hasExclusiveSubscriber();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
-
- public boolean isExplicitAcknowledge()
- {
- return _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public String getCreditMode()
- {
- return _flowMode.toString();
- }
-
- public String getName()
- {
- return _destination;
- }
-
- public Map<String, Object> getArguments()
- {
- return _arguments;
- }
-
- public boolean isSessionTransactional()
- {
- return _session.isTransactional();
- }
-
- public void queueEmpty()
- {
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public String toLogString()
- {
- String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
- _queue.getName());
- String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
- // queueString is "vh(/{0})/qu({1}) " so need to trim
- + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
- return result;
- }
-
- private String getFilterLogString()
- {
- StringBuilder filterLogString = new StringBuilder();
- String delimiter = ", ";
- boolean hasEntries = false;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString.append(_filters.toString());
- hasEntries = true;
- }
-
- if (isBrowser())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Browser");
- hasEntries = true;
- }
-
- if (isDurable())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Durable");
- hasEntries = true;
- }
-
- return filterLogString.toString();
- }
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
-
-
- public void flushBatched()
- {
- _session.getConnection().flush();
- }
-
- public long getBytesOut()
- {
- return _deliveredBytes.longValue();
- }
-
- public long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
-
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c6d4151628..9df1e7b89b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -85,6 +85,7 @@ import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
@@ -1256,7 +1257,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
BaseQueue queue = _destinationQueues.get(i);
- BaseQueue.PostEnqueueAction action;
+ Action<QueueEntry> action;
if(immediate)
{
@@ -1267,7 +1268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
action = null;
}
- queue.enqueue(message, isTransactional(), action);
+ queue.enqueue(message, action);
if(queue instanceof AMQQueue)
{
@@ -1295,14 +1296,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ private class ImmediateAction implements Action<QueueEntry>
{
public ImmediateAction()
{
}
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
AMQQueue queue = entry.getQueue();
@@ -1310,11 +1311,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
+ MessageReference ref = message.newReference();
+ entry.delete();
+ txn.dequeue(queue, message,
+ new ServerTransaction.Action()
{
@Override
public void postCommit()
@@ -1336,11 +1337,17 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
throw new RuntimeException(e);
}
- super.postCommit();
+ }
+
+ @Override
+ public void onRollback()
+ {
+
}
}
);
txn.commit();
+ ref.release();
}
@@ -1352,10 +1359,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ private final class CapacityCheckAction implements Action<QueueEntry>
{
@Override
- public void onEnqueue(final QueueEntry entry)
+ public void performAction(final QueueEntry entry)
{
AMQQueue queue = entry.getQueue();
queue.checkCapacity(AMQChannel.this);
@@ -1398,10 +1405,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- for(QueueEntry entry : _ackedMessages)
- {
- entry.release();
- }
+ for(QueueEntry entry : _ackedMessages)
+ {
+ entry.release();
+ }
}
finally
{
@@ -1576,10 +1583,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
{
@Override
- public void onEnqueue(final QueueEntry requeueEntry)
+ public void performAction(final QueueEntry requeueEntry)
{
_actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getQueue().getName()));
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index c7a84fa3b6..8d45160848 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -1678,7 +1678,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
props,
_channelId,
deliveryTag,
- ((SubscriptionImpl)sub).getConsumerTag());
+ new AMQShortString(sub.getName()));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
index 93b51a0567..05f35748ee 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
@@ -25,10 +25,14 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
@@ -47,27 +51,30 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory
return createSubscription(channel, protocolSession, consumerTag, acks, filters,
- noLocal,
- creditManager,
- clientMethod,
- recordMethod
- );
+ noLocal,
+ creditManager,
+ clientMethod,
+ recordMethod
+ );
}
+
public Subscription createSubscription(final AMQChannel channel,
- final AMQProtocolSession protocolSession,
- final AMQShortString consumerTag,
- final boolean acks,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod clientMethod,
- final RecordDeliveryMethod recordMethod
- )
+ final AMQProtocolSession protocolSession,
+ final AMQShortString consumerTag,
+ final boolean acks,
+ final FieldTable filters,
+ final boolean noLocal,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod clientMethod,
+ final RecordDeliveryMethod recordMethod
+ )
throws AMQException
{
boolean isBrowser;
-
+ SubscriptionTarget_0_8 target;
+ Subscription subscription;
+
if (filters != null)
{
Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
@@ -78,30 +85,56 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory
isBrowser = false;
}
+ final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ boolean acquires;
+ boolean seesReuques;
+ boolean isTransient;
if(isBrowser)
{
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ target = new SubscriptionTarget_0_8.BrowserSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod);
+ acquires = false;
+ seesReuques = false;
+ isTransient = true;
}
else if(acks)
{
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ target = new SubscriptionTarget_0_8.AckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod);
+ acquires = true;
+ seesReuques = true;
+ isTransient = false;
}
else
{
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod);
+ acquires = true;
+ seesReuques = true;
+ isTransient = false;
}
+ subscription =
+ new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, acquires, seesReuques, AMQShortString.toString(consumerTag),isTransient,target);
+ target.setSubscription(subscription);
+ return subscription;
}
- public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
- final AMQProtocolSession session,
- final AMQShortString consumerTag,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod deliveryMethod,
- final RecordDeliveryMethod recordMethod) throws AMQException
+
+
+ public Subscription createBasicGetNoAckSubscription(final AMQChannel channel,
+ final AMQProtocolSession session,
+ final AMQShortString consumerTag,
+ final FieldTable filters,
+ final boolean noLocal,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod deliveryMethod,
+ final RecordDeliveryMethod recordMethod) throws AMQException
{
- return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod);
+ SubscriptionTarget_0_8 target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+
+ Subscription subscription;
+ final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ subscription =
+ new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, true, true, AMQShortString.toString(consumerTag),true,target);
+ target.setSubscription(subscription);
+ return subscription;
}
public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
deleted file mode 100644
index 7c52fbe3b0..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ /dev/null
@@ -1,858 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
- * that was given out by the broker and the channel id. <p/>
- */
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
-{
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
-
- }
- };
-
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private volatile AMQQueue.Context _queueContext;
-
- private final ClientDeliveryMethod _deliveryMethod;
- private final RecordDeliveryMethod _recordMethod;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
-
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
-
- private final Lock _stateChangeLock;
-
- private final long _subscriptionID;
- private LogSubject _logSubject;
- private LogActor _logActor;
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
-
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
- private long _createTime = System.currentTimeMillis();
-
-
- static final class BrowserSubscription extends SubscriptionImpl
- {
- public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return true;
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry
- * @param batch
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
- // We don't decrement the reference here as we don't want to consume the message
- // but we do want to send it to the client.
-
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
- }
-
- }
-
- @Override
- public boolean wouldSuspend(QueueEntry msg)
- {
- return false;
- }
-
- }
-
- public static class NoAckSubscription extends SubscriptionImpl
- {
- private final AutoCommitTransaction _txn;
-
- public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore());
- }
-
-
- public boolean isBrowser()
- {
- return false;
- }
-
- @Override
- public boolean isExplicitAcknowledge()
- {
- return false;
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry The message to send
- * @param batch
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- _txn.dequeue(getQueue(), entry.getMessage(), NOOP);
-
- ServerMessage message = entry.getMessage();
- MessageReference ref = message.newReference();
- InstanceProperties props = entry.getInstanceProperties();
- entry.delete();
-
- synchronized (getChannel())
- {
- getChannel().getProtocolSession().setDeferFlush(batch);
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- sendToClient(message, props, deliveryTag);
-
- }
- ref.release();
-
-
- }
-
- @Override
- public boolean wouldSuspend(QueueEntry msg)
- {
- return false;
- }
-
- private static final ServerTransaction.Action NOOP =
- new ServerTransaction.Action()
- {
- @Override
- public void postCommit()
- {
- }
-
- @Override
- public void onRollback()
- {
- }
- };
- }
-
- /**
- * NoAck Subscription for use with BasicGet method.
- */
- public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
- {
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
- public boolean isTransient()
- {
- return true;
- }
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !getCreditManager().useCreditForMessage(msg.getMessage().getSize());
- }
-
- }
-
- static final class AckSubscription extends SubscriptionImpl
- {
- public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return false;
- }
-
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry The message to send
- * @param batch
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
-
-
- synchronized (getChannel())
- {
- getChannel().getProtocolSession().setDeferFlush(batch);
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- addUnacknowledgedMessage(entry);
- recordMessageDelivery(entry, deliveryTag);
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
- entry.incrementDeliveryCount();
-
- }
- }
-
-
-
- }
-
-
- private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
-
- private final AMQChannel _channel;
-
- private final AMQShortString _consumerTag;
-
-
- private boolean _noLocal;
-
- private final FlowCreditManager _creditManager;
-
- private FilterManager _filters;
-
- private final Boolean _autoClose;
-
- private AMQQueue _queue;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
-
-
- public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable arguments,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
- _channel = channel;
- _consumerTag = consumerTag;
-
- _creditManager = creditManager;
- creditManager.addStateListener(this);
-
- _noLocal = noLocal;
-
-
- _filters = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
-
- _deliveryMethod = deliveryMethod;
- _recordMethod = recordMethod;
-
-
- _stateChangeLock = new ReentrantLock();
-
-
- if (arguments != null)
- {
- Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
- if (autoClose != null)
- {
- _autoClose = (Boolean) autoClose;
- }
- else
- {
- _autoClose = false;
- }
- }
- else
- {
- _autoClose = false;
- }
-
- }
-
- public AMQSessionModel getSessionModel()
- {
- return _channel;
- }
-
- public Long getDelivered()
- {
- return _deliveredCount.get();
- }
-
- public synchronized void setQueue(AMQQueue queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- _logSubject = new SubscriptionLogSubject(this);
- _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
-
- if (CurrentActor.get().getRootMessageLogger().
- isMessageEnabled(CurrentActor.get(), _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- // Get the string value of the filters
- String filterLogString = null;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString = _filters.toString();
- }
-
- if (isAutoClose())
- {
- if (filterLogString == null)
- {
- filterLogString = "";
- }
- else
- {
- filterLogString += ",";
- }
- filterLogString += "AutoClose";
- }
-
- if (isBrowser())
- {
- // We do not need to check for null here as all Browsers are AutoClose
- filterLogString +=",Browser";
- }
-
- CurrentActor.get().
- message(_logSubject,
- SubscriptionMessages.CREATE(filterLogString,
- queue.isDurable() && exclusive,
- filterLogString != null));
- }
- }
-
- public String toString()
- {
- String subscriber = "[channel=" + _channel +
- ", consumerTag=" + _consumerTag +
- ", session=" + getProtocolSession().getKey() ;
-
- return subscriber + "]";
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry
- * @param batch
- * @throws AMQException
- */
- abstract public void send(QueueEntry entry, boolean batch) throws AMQException;
-
-
- public boolean isSuspended()
- {
- return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
- }
-
- /**
- * Callback indicating that a queue has been deleted.
- *
- * @param queue The queue to delete
- */
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Subscription:" + this + " rejected message:" + entry);
- }
- }
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- if (_noLocal)
- {
- AMQMessage message = (AMQMessage) entry.getMessage();
-
- final Object publisherReference = message.getConnectionReference();
-
- // We don't want local messages so check to see if message is one we sent
- Object localReference = getProtocolSession().getReference();
-
- if(publisherReference != null && publisherReference.equals(localReference))
- {
- return false;
- }
- }
- }
- else
- {
- // No interest in messages we can't convert to AMQMessage
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class)==null)
- {
- return false;
- }
- }
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + this + ") checking filters for message (" + entry);
- }
- return checkFilters(entry);
-
- }
-
- private boolean checkFilters(QueueEntry msg)
- {
- return (_filters == null) || _filters.allAllow(msg.asFilterable());
- }
-
- public boolean isAutoClose()
- {
- return _autoClose;
- }
-
- public FlowCreditManager getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- }
- finally
- {
- _stateChangeLock.unlock();
- }
- //Log Subscription closed
- CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE());
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !_creditManager.useCreditForMessage(msg.getMessage().getSize());
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public AMQChannel getChannel()
- {
- return _channel;
- }
-
- public AMQShortString getConsumerTag()
- {
- return _consumerTag;
- }
-
- public String getConsumerName()
- {
- return _consumerTag == null ? null : _consumerTag.asString();
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _channel.getProtocolSession();
- }
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public void onDequeue(final QueueEntry queueEntry)
- {
- restoreCredit(queueEntry);
- }
-
- public void releaseQueueEntry(final QueueEntry queueEntry)
- {
- restoreCredit(queueEntry);
- }
-
- public void restoreCredit(final QueueEntry queueEntry)
- {
- _creditManager.restoreCredit(1, queueEntry.getSize());
- }
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- CurrentActor.get().message(_logSubject,SubscriptionMessages.STATE(_state.get().toString()));
- }
-
- public State getState()
- {
- return _state.get();
- }
-
-
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context context)
- {
- _queueContext = context;
- }
-
-
- protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
- throws AMQException
- {
- _deliveryMethod.deliverToClient(this, message, props, deliveryTag);
- _deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(message.getSize());
- }
-
-
- protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
- {
- _recordMethod.recordMessageDelivery(this,entry,deliveryTag);
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void confirmAutoClose()
- {
- ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
- }
-
- public boolean acquires()
- {
- return !isBrowser();
- }
-
- public boolean seesRequeues()
- {
- return !isBrowser();
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
-
- public void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- abstract boolean isBrowser();
-
- public String getCreditMode()
- {
- return "WINDOW";
- }
-
- public boolean isBrowsing()
- {
- return isBrowser();
- }
-
- public boolean isExplicitAcknowledge()
- {
- return true;
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- public boolean isExclusive()
- {
- return getQueue().hasExclusiveSubscriber();
- }
-
- public String getName()
- {
- return String.valueOf(_consumerTag);
- }
-
- public Map<String, Object> getArguments()
- {
- return null;
- }
-
- public boolean isSessionTransactional()
- {
- return _channel.isTransactional();
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public void queueEmpty() throws AMQException
- {
- if (isAutoClose())
- {
- _queue.unregisterSubscription(this);
-
- confirmAutoClose();
- }
- }
-
- public void flushBatched()
- {
- _channel.getProtocolSession().setDeferFlush(false);
-
- _channel.getProtocolSession().flushBatched();
- }
-
- public long getBytesOut()
- {
- return _deliveredBytes.longValue();
- }
-
- public long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
-
-
- protected void addUnacknowledgedMessage(QueueEntry entry)
- {
- final long size = entry.getSize();
- _unacknowledgedBytes.addAndGet(size);
- _unacknowledgedCount.incrementAndGet();
- entry.addStateChangeListener(new QueueEntry.StateChangeListener()
- {
- public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
- {
- if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
- {
- _unacknowledgedBytes.addAndGet(-size);
- _unacknowledgedCount.decrementAndGet();
- entry.removeStateChangeListener(this);
- }
- }
- });
- }
-
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
new file mode 100644
index 0000000000..6c91e6130e
--- /dev/null
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
@@ -0,0 +1,506 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
+ */
+public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+{
+
+ private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
+ new StateChangeListener<QueueEntry, QueueEntry.State>()
+ {
+ @Override
+ public void stateChanged(final QueueEntry entry,
+ final QueueEntry.State oldSate,
+ final QueueEntry.State newState)
+ {
+ if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+ {
+ restoreCredit(entry);
+ }
+ entry.removeStateChangeListener(this);
+ }
+ };
+
+ private final ClientDeliveryMethod _deliveryMethod;
+ private final RecordDeliveryMethod _recordMethod;
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+ private Subscription _subscription;
+
+
+ static final class BrowserSubscription extends SubscriptionTarget_0_8
+ {
+ public BrowserSubscription(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag,
+ filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ }
+
+ }
+
+ @Override
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return true;
+ }
+
+ }
+
+ public static class NoAckSubscription extends SubscriptionTarget_0_8
+ {
+ private final AutoCommitTransaction _txn;
+
+ public NoAckSubscription(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+
+ _txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore());
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry The message to send
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ _txn.dequeue(getSubscription().getQueue(), entry.getMessage(), NOOP);
+
+ ServerMessage message = entry.getMessage();
+ MessageReference ref = message.newReference();
+ InstanceProperties props = entry.getInstanceProperties();
+ entry.delete();
+
+ synchronized (getChannel())
+ {
+ getChannel().getProtocolSession().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ sendToClient(message, props, deliveryTag);
+
+ }
+ ref.release();
+
+
+ }
+
+ @Override
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return true;
+ }
+
+ private static final ServerTransaction.Action NOOP =
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ }
+
+ @Override
+ public void onRollback()
+ {
+ }
+ };
+ }
+
+ /**
+ * NoAck Subscription for use with BasicGet method.
+ */
+ public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return getCreditManager().useCreditForMessage(msg.getMessage().getSize());
+ }
+
+ }
+
+ static final class AckSubscription extends SubscriptionTarget_0_8
+ {
+ public AckSubscription(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry The message to send
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+
+
+ synchronized (getChannel())
+ {
+ getChannel().getProtocolSession().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ addUnacknowledgedMessage(entry);
+ recordMessageDelivery(entry, deliveryTag);
+ entry.addStateChangeListener(getReleasedStateChangeListener());
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ entry.incrementDeliveryCount();
+
+ }
+ }
+
+
+
+ }
+
+
+ private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class);
+
+ private final AMQChannel _channel;
+
+ private final AMQShortString _consumerTag;
+
+ private final FlowCreditManager _creditManager;
+
+ private final Boolean _autoClose;
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+
+ public SubscriptionTarget_0_8(AMQChannel channel,
+ AMQShortString consumerTag,
+ FieldTable arguments,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(State.ACTIVE);
+
+ _channel = channel;
+ _consumerTag = consumerTag;
+
+ _creditManager = creditManager;
+ creditManager.addStateListener(this);
+
+ _deliveryMethod = deliveryMethod;
+ _recordMethod = recordMethod;
+
+ if (arguments != null)
+ {
+ Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ public AMQSessionModel getSessionModel()
+ {
+ return _channel;
+ }
+
+ public String toString()
+ {
+ String subscriber = "[channel=" + _channel +
+ ", consumerTag=" + _consumerTag +
+ ", session=" + getProtocolSession().getKey() ;
+
+ return subscriber + "]";
+ }
+
+ public boolean isSuspended()
+ {
+ return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted.
+ *
+ */
+ public void queueDeleted()
+ {
+ _deleted.set(true);
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ getSubscription().getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ _creditManager.removeListener(this);
+ return closed;
+ }
+ finally
+ {
+ getSubscription().releaseSendLock();
+ }
+ }
+
+
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return _creditManager.useCreditForMessage(msg.getMessage().getSize());
+ }
+
+ public AMQChannel getChannel()
+ {
+ return _channel;
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _channel.getProtocolSession();
+ }
+
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ _creditManager.restoreCredit(1, queueEntry.getSize());
+ }
+
+ protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
+ {
+ return _entryReleaseListener;
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(!updateState(State.SUSPENDED, State.ACTIVE))
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ throws AMQException
+ {
+ _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag);
+
+ }
+
+
+ protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
+ {
+ _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
+ }
+
+
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+ }
+
+ public void queueEmpty() throws AMQException
+ {
+ if (isAutoClose())
+ {
+ close();
+ confirmAutoClose();
+ }
+ }
+
+ public void flushBatched()
+ {
+ _channel.getProtocolSession().setDeferFlush(false);
+
+ _channel.getProtocolSession().flushBatched();
+ }
+
+ protected void addUnacknowledgedMessage(QueueEntry entry)
+ {
+ final long size = entry.getSize();
+ _unacknowledgedBytes.addAndGet(size);
+ _unacknowledgedCount.incrementAndGet();
+ entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()
+ {
+ public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+ {
+ if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+ {
+ _unacknowledgedBytes.addAndGet(-size);
+ _unacknowledgedCount.decrementAndGet();
+ entry.removeStateChangeListener(this);
+ }
+ }
+ });
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 4b569ccc71..a48ae3826e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -162,7 +162,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
queue.registerSubscription(sub,false);
- queue.flushSubscription(sub);
+ sub.flush();
queue.unregisterSubscription(sub);
return(!singleMessageCredit.hasCredit());
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
index f2ab154b32..90f80a27e7 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
@@ -98,8 +98,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
" on channel:" + channel.debugIdentity());
}
- message.reject();
-
if (body.getRequeue())
{
channel.requeue(deliveryTag);
@@ -110,7 +108,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
}
else
{
- final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
_logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
if (maxDeliveryCountEnabled)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 3fdce83c2a..263175d590 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -134,8 +135,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
};
protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
+ queue.addQueueDeleteTask(new Action<AMQQueue>() {
+ public void performAction(AMQQueue queue)
{
protocolConnection.removeSessionCloseTask(sessionCloseTask);
}
@@ -245,9 +246,9 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
session.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
session.removeSessionCloseTask(deleteQueueTask);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index ef0837b3c6..6d3e758144 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -60,7 +60,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
// ChannelID(LIST) -> LinkedList<Pair>
- private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
@@ -68,7 +68,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
- _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+ _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
setTestAuthorizedSubject();
setVirtualHost(virtualHost);
@@ -117,7 +117,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
synchronized (_channelDelivers)
{
- List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+ List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag));
if (all == null)
{
@@ -153,23 +153,23 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag));
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(consumerTag, consumerDelivers);
+ consumers.put(consumerTag.toString(), consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
}
}
@@ -254,20 +254,20 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(_channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName());
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+ consumers.put(sub.getName(), consumerDelivers);
}
consumerDelivers.add(new DeliveryPair(deliveryTag, message));
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 4082f22e9c..41e2fef03f 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -53,16 +54,8 @@ public class Connection_1_0 implements ConnectionEventListener
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
-
-
- public static interface Task
- {
- public void doTask(Connection_1_0 connection);
- }
-
-
- private List<Task> _closeTasks =
- Collections.synchronizedList(new ArrayList<Task>());
+ private List<Action<Connection_1_0>> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
@@ -98,26 +91,26 @@ public class Connection_1_0 implements ConnectionEventListener
_sessions.remove(session);
}
- void removeConnectionCloseTask(final Task task)
+ void removeConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.remove( task );
}
- void addConnectionCloseTask(final Task task)
+ void addConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.add( task );
}
public void closeReceived()
{
- List<Task> taskCopy;
+ List<Action<Connection_1_0>> taskCopy;
synchronized (_closeTasks)
{
- taskCopy = new ArrayList<Task>(_closeTasks);
+ taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
}
- for(Task task : taskCopy)
+ for(Action<Connection_1_0> task : taskCopy)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized (_closeTasks)
{
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 4abf1bf76b..400bc1d085 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -69,6 +69,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
@@ -167,7 +168,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
+ _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY, messageFilter == null ? null : new SimpleFilterManager(messageFilter));
}
else if(destination instanceof ExchangeDestination)
{
@@ -309,10 +310,10 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
final String queueName = name;
final AMQQueue tempQueue = queue;
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -331,9 +332,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
}
@@ -356,17 +357,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_logger.error("Error", e);
}
- _subscription = new Subscription_1_0(this, qd, true);
+ _subscription = new Subscription_1_0(this, qd, true, messageFilter == null ? null : new SimpleFilterManager(messageFilter));
}
if(_subscription != null)
{
_subscription.setNoLocal(noLocal);
- if(messageFilter!=null)
- {
- _subscription.setFilters(new SimpleFilterManager(messageFilter));
- }
+
try
{
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 823e4cb16d..51ff9c13cb 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
@@ -343,10 +344,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
{
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -365,9 +366,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
_connection.addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
_connection.removeConnectionCloseTask(deleteQueueTask);
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 6a3f5b46e1..149f89fab1 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -22,10 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -41,63 +38,37 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Modified;
import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.AbstractSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class
- Subscription_1_0 implements Subscription
+class Subscription_1_0 extends AbstractSubscription implements Subscription
{
private SendingLink_1_0 _link;
- private AMQQueue _queue;
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
- private final long _id;
- private final boolean _acquires;
- private volatile AMQQueue.Context _queueContext;
- private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private ReentrantLock _stateChangeLock = new ReentrantLock();
-
- private boolean _noLocal;
- private FilterManager _filters;
-
private long _deliveryTag = 0L;
- private StateListener _stateListener;
private Binary _transactionId;
- private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer();
- private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
-
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
- {
- this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY);
- }
+ private final AMQPDescribedTypeRegistry _typeRegistry;
+ private final SectionEncoder _sectionEncoder;
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires)
+ public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires, FilterManager filters)
{
+ super(filters,Message_1_0.class,link.getSession().getConnectionReference(), acquires, acquires, link.getEndpoint().getName(), false);
_link = link;
- _queue = destination.getQueue();
- _id = getEndpoint().getLocalHandle().longValue();
- _acquires = acquires;
+ _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
+ _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
+ setQueue(destination.getQueue(),false);
+ updateState(State.ACTIVE, State.SUSPENDED);
}
private SendingLinkEndpoint getEndpoint()
@@ -105,89 +76,40 @@ class
return _link.getEndpoint();
}
- public LogActor getLogActor()
- {
- return null; //TODO
- }
-
- public boolean isTransient()
- {
- return true; //TODO
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(final AMQQueue queue, final boolean exclusive)
- {
- //TODO
- }
-
- public void setNoLocal(final boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public long getSubscriptionID()
- {
- return _id;
- }
-
public boolean isSuspended()
{
return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
}
- public boolean hasInterest(final QueueEntry entry)
+ public void close()
{
- if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference())
+ boolean closed = false;
+ State state = getState();
+
+ getSendLock();
+ try
{
- return false;
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ else
+ {
+ getStateListener().stateChanged(this, state, State.CLOSED);
+ }
+ }
}
- else if(!(entry.getMessage() instanceof Message_1_0)
- && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+ finally
{
- return false;
+ releaseSendLock();
}
- return checkFilters(entry);
-
- }
-
- private boolean checkFilters(final QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- public boolean isClosed()
- {
- return !getEndpoint().isAttached();
}
- public boolean acquires()
- {
- return _acquires;
- }
-
- public boolean seesRequeues()
- {
- // TODO
- return acquires();
- }
-
- public void close()
- {
- getEndpoint().detach();
- }
-
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ protected void doSend(QueueEntry entry, boolean batch) throws AMQException
{
// TODO
send(entry);
@@ -301,7 +223,7 @@ class
}
else
{
- UnsettledAction action = _acquires
+ UnsettledAction action = acquires()
? new DispositionAction(tag, queueEntry)
: new DoNothingAction(tag, queueEntry);
@@ -315,7 +237,7 @@ class
transfer.setState(state);
}
// TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
+ if(acquires() && _transactionId != null)
{
ServerTransaction txn = _link.getTransaction(_transactionId);
if(txn != null)
@@ -352,7 +274,7 @@ class
}
- public void queueDeleted(final AMQQueue queue)
+ public void queueDeleted()
{
//TODO
getEndpoint().setSource(null);
@@ -373,98 +295,33 @@ class
}
}
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
public void suspend()
{
synchronized(_link.getLock())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ if(updateState(State.ACTIVE, State.SUSPENDED))
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
}
}
}
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void releaseQueueEntry(QueueEntry queueEntryImpl)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public void onDequeue(final QueueEntry queueEntry)
- {
- //TODO
- }
public void restoreCredit(final QueueEntry queueEntry)
{
//TODO
}
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
- public boolean isSessionTransactional()
- {
- return false; //TODO
- }
-
public void queueEmpty()
{
synchronized(_link.getLock())
{
if(_link.drained())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ if(updateState(State.ACTIVE, State.SUSPENDED))
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
}
}
}
@@ -476,9 +333,9 @@ class
{
if(isSuspended() && getEndpoint() != null)
{
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ if(updateState(State.SUSPENDED, State.ACTIVE))
{
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ getStateListener().stateChanged(this, State.SUSPENDED, State.ACTIVE);
}
_transactionId = _link.getTransactionId();
}
@@ -640,16 +497,6 @@ class
}
}
- public FilterManager getFilters()
- {
- return _filters;
- }
-
- public void setFilters(final FilterManager filters)
- {
- _filters = filters;
- }
-
@Override
public AMQSessionModel getSessionModel()
{
@@ -658,20 +505,6 @@ class
}
@Override
- public long getBytesOut()
- {
- // TODO
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- // TODO
- return 0;
- }
-
- @Override
public long getUnacknowledgedBytes()
{
// TODO
@@ -685,10 +518,4 @@ class
return 0;
}
- @Override
- public String getConsumerName()
- {
- //TODO
- return "TODO";
- }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index 9f532ec5f7..4defbd7eb0 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -177,7 +177,8 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
List<String> results = findMatches(SUB_PREFIX);
- assertEquals("Result set larger than expected.", 2, results.size());
+ final int expected = isBroker010() ? 5 : 2;
+ assertEquals("Result set larger than expected.", expected, results.size());
String log = getLogMessage(results, 0);
@@ -185,14 +186,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
String message = getMessageString(fromMessage(log));
assertTrue("Browser not on log message:" + message, message.contains("Browser"));
- if(!isBroker010())
- {
- assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose"));
- }
// Beacause it is an auto close and we have no messages on the queue we
// will get a close message
- log = getLogMessage(results, 1);
+ log = getLogMessage(results, expected-1);
validateMessageID("SUB-1002", log);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java b/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
index 05c8e362a1..d558e07ed7 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
@@ -67,16 +67,15 @@ public class ConnectionRestTest extends QpidRestTestCase
producer.send(_session.createTextMessage("Test-" + i));
}
_session.commit();
-
Message m = consumer.receive(1000l);
- assertNotNull("Message was not received", m);
+ assertNotNull("First message was not received", m);
_session.commit();
// receive the rest of messages for rollback
for (int i = 0; i < MESSAGE_NUMBER - 1; i++)
{
m = consumer.receive(1000l);
- assertNotNull("Message was not received", m);
+ assertNotNull("Subsequent messages were not received", m);
}
_session.rollback();
@@ -84,7 +83,7 @@ public class ConnectionRestTest extends QpidRestTestCase
for (int i = 0; i < MESSAGE_NUMBER - 1; i++)
{
m = consumer.receive(1000l);
- assertNotNull("Message was not received", m);
+ assertNotNull("Message was not received after rollback", m);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
index 54c3225cec..b6f81da690 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -32,6 +32,7 @@ import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -169,7 +170,8 @@ public class RollbackOrderTest extends QpidBrokerTestCase
//Start the session now so we
_connection.start();
- count.await();
+ count.await(10l, TimeUnit.SECONDS);
+ assertEquals("Not all message received. Count should be 0.", 0, count.getCount());
for (Exception e : exceptions)
{