summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-03 23:28:12 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-03 23:28:12 +0000
commit7ec70e560ee9a446a060d168b04a30ebc305c75c (patch)
treebd8aeefe542b432c6a37950e35ca4ca6a0baeeb6
parent7a690b854f6492e4d517320598d93f4d30081e28 (diff)
downloadqpid-python-7ec70e560ee9a446a060d168b04a30ebc305c75c.tar.gz
Change subscription registration for queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564130 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java8
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java34
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java1
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java9
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java4
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java12
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java20
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java14
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java232
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java167
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java35
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java27
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java16
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java94
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java68
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java142
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java57
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java21
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java65
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java11
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java96
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java38
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java17
25 files changed, 554 insertions, 641 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index f6602c8071..d822705a39 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -25,15 +25,19 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
import java.util.Set;
@@ -84,7 +88,9 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
VirtualHost getVirtualHost();
- void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
+ Subscription registerSubscription(final SubscriptionTarget target, final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName, EnumSet<Subscription.Option> options) throws AMQException;
void unregisterSubscription(final Subscription subscription) throws AMQException;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 5f79498beb..d73bd638b8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,15 +18,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -43,6 +35,7 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -55,9 +48,11 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
import org.apache.qpid.server.subscription.MessageGroupManager;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -390,7 +385,25 @@ public class SimpleAMQQueue implements AMQQueue,
// ------ Manage Subscriptions
- public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
+
+ @Override
+ public Subscription registerSubscription(final SubscriptionTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Subscription.Option> optionSet) throws AMQException
+ {
+
+ DelegatingSubscription sub = new DelegatingSubscription(filters, messageClass,
+ optionSet.contains(Subscription.Option.ACQUIRES),
+ optionSet.contains(Subscription.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target);
+ target.subscriptionRegistered(sub);
+ registerSubscription(sub, optionSet.contains(Subscription.Option.EXCLUSIVE));
+ return sub;
+ }
+
+ private synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
{
// Access control
@@ -479,6 +492,7 @@ public class SimpleAMQQueue implements AMQQueue,
setExclusiveSubscriber(null);
subscription.setQueueContext(null);
+
if(_messageGroupManager != null)
{
resetSubPointersForGroups(subscription, true);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
index 213d8b7730..9b97f87d04 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
@@ -178,7 +178,7 @@ public abstract class AbstractSubscription implements Subscription
else
{
// no interest in messages we can't convert
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
+ if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
{
return false;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
index 623371c84c..602cabcf75 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
@@ -143,6 +143,7 @@ public class DelegatingSubscription<T extends SubscriptionTarget> extends Abstra
public void close()
{
_target.close();
+ _target.subscriptionRemoved(this);
}
@Override
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index f9278cf719..03a56f7dc2 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -32,6 +32,15 @@ public interface Subscription
{
AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+ enum Option
+ {
+ ACQUIRES,
+ SEES_REQUEUES,
+ TRANSIENT,
+ EXCLUSIVE,
+ NO_LOCAL
+ }
+
LogActor getLogActor();
boolean isTransient();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
index 0b0be38f42..bdcd0c15fb 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
@@ -36,6 +36,10 @@ public interface SubscriptionTarget
State getState();
+ void subscriptionRegistered(Subscription sub);
+
+ void subscriptionRemoved(Subscription sub);
+
void setStateListener(StateChangeListener<SubscriptionTarget, State> listener);
long getUnacknowledgedBytes();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
index a0a2a7b648..6b59f65940 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
@@ -20,11 +20,17 @@
*/
package org.apache.qpid.server.logging.actors;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.BrokerTestHelper;
import java.util.List;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Test : AMQPConnectionActorTest
* Validate the AMQPConnectionActor class.
@@ -42,9 +48,11 @@ public class SubscriptionActorTest extends BaseConnectionActorTestCase
{
super.setUp();
- MockSubscription mockSubscription = new MockSubscription();
+ Subscription mockSubscription = mock(Subscription.class);
+ final AMQQueue queue = BrokerTestHelper.createQueue(getName(), getVirtualHost());
+ when(mockSubscription.getQueue()).thenReturn(queue);
+ when(mockSubscription.getSubscriptionID()).thenReturn(0l);
- mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getVirtualHost()), false);
setAmqpActor(new SubscriptionActor(getRootLogger(), mockSubscription));
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index a468fa072b..1947a4e3b6 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -28,7 +28,10 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import java.util.ArrayList;
+import java.util.EnumSet;
+
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.subscription.Subscription;
import static org.mockito.Mockito.when;
@@ -62,7 +65,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
queue.enqueue(createMessage(9L, (byte) 0));
// Register subscriber
- queue.registerSubscription(getSubscription(), false);
+ queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class));
Thread.sleep(150);
ArrayList<QueueEntry> msgs = getSubscription().getMessages();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 98ecdcdd3b..e8b644045a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -24,16 +24,20 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -203,16 +207,24 @@ public class MockAMQQueue implements AMQQueue
return _virtualhost;
}
- public String getName()
+ @Override
+ public Subscription registerSubscription(final SubscriptionTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ final EnumSet<Subscription.Option> options) throws AMQException
{
- return _name;
+ return new DelegatingSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES),
+ options.contains(Subscription.Option.SEES_REQUEUES), consumerName,
+ options.contains(Subscription.Option.TRANSIENT), target );
}
- public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
+ public String getName()
{
-
+ return _name;
}
+
public void unregisterSubscription(Subscription subscription) throws AMQException
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 2b1e7f5e1f..2753a89bf4 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -113,11 +113,19 @@ public abstract class QueueEntryImplTestBase extends TestCase
*/
private void acquire()
{
- _queueEntry.acquire(new MockSubscription());
+ _queueEntry.acquire(newMockSubscription());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
+ private Subscription newMockSubscription()
+ {
+ final Subscription subscription = mock(Subscription.class);
+ when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+ when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+ return subscription;
+ }
+
/**
* A helper method to get entry state
*
@@ -145,7 +153,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
*/
public void testRejectAndRejectedBy()
{
- Subscription sub = new MockSubscription();
+ Subscription sub = newMockSubscription();
long subId = sub.getSubscriptionID();
assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
@@ -160,7 +168,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
//repeat rejection using a second subscription
- Subscription sub2 = new MockSubscription();
+ Subscription sub2 = newMockSubscription();
long sub2Id = sub2.getSubscriptionID();
assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index ffd64774c0..3bb7102d2a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -29,6 +29,8 @@ import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -44,6 +46,7 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -65,7 +68,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockSubscription _subscription = new MockSubscription();
+ private MockSubscription _subscriptionTarget = new MockSubscription();
+ private Subscription _subscription;
private Map<String,Object> _arguments = null;
@Override
@@ -159,17 +163,17 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
{
+ ServerMessage messageA = createMessage(new Long(24));
+
// Check adding a subscription adds it to the queue
- _queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
try
{
@@ -179,14 +183,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
}
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull(((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
- assertTrue("Subscription still had queue", _subscription.isClosed());
+ assertTrue("Subscription still had queue", _subscriptionTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
- 1 == _queue.getActiveConsumerCount());
+ 1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB);
@@ -198,10 +202,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- _queue.registerSubscription(_subscription, false);
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
Thread.sleep(150);
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("There should be no releasedEntry after an enqueue", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -213,10 +218,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA);
_queue.enqueue(messageB);
- _queue.registerSubscription(_subscription, false);
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
Thread.sleep(150);
assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("There should be no releasedEntry after enqueues", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -225,7 +231,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleasedMessageIsResentToSubscriber() throws Exception
{
- _queue.registerSubscription(_subscription, false);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ ServerMessage messageB = createMessage(new Long(25));
+ ServerMessage messageC = createMessage(new Long(26));
+
+
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES,
+ Subscription.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -236,10 +250,6 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
};
- ServerMessage messageA = createMessage(new Long(24));
- ServerMessage messageB = createMessage(new Long(25));
- ServerMessage messageC = createMessage(new Long(26));
-
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction);
@@ -248,7 +258,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -259,11 +269,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -273,7 +283,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
{
- _queue.registerSubscription(_subscription, false);
+ ServerMessage messageA = createMessage(new Long(24));
+
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -286,7 +300,6 @@ public class SimpleAMQQueueTest extends QpidTestCase
/* Enqueue one message with expiration set for a short time in the future */
- ServerMessage messageA = createMessage(new Long(24));
int messageExpirationOffset = 200;
final long expiration = System.currentTimeMillis() + messageExpirationOffset;
when(messageA.getExpiration()).thenReturn(expiration);
@@ -296,7 +309,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -306,9 +319,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
+ assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
}
@@ -320,7 +333,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception
{
- _queue.registerSubscription(_subscription, false);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ ServerMessage messageB = createMessage(new Long(25));
+ ServerMessage messageC = createMessage(new Long(26));
+
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES,
+ Subscription.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -331,10 +351,6 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
};
- ServerMessage messageA = createMessage(new Long(24));
- ServerMessage messageB = createMessage(new Long(25));
- ServerMessage messageC = createMessage(new Long(26));
-
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction);
@@ -343,7 +359,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -355,11 +371,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
}
@@ -369,11 +385,21 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
{
- MockSubscription subscription1 = new MockSubscription();
- MockSubscription subscription2 = new MockSubscription();
+ ServerMessage messageA = createMessage(new Long(24));
+ ServerMessage messageB = createMessage(new Long(25));
+
+ MockSubscription target1 = new MockSubscription();
+ MockSubscription target2 = new MockSubscription();
+
+
+ Subscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES,
+ Subscription.Option.SEES_REQUEUES));
+
+ Subscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES,
+ Subscription.Option.SEES_REQUEUES));
- _queue.registerSubscription(subscription1, false);
- _queue.registerSubscription(subscription2, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -384,8 +410,6 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
};
- ServerMessage messageA = createMessage(new Long(24));
- ServerMessage messageB = createMessage(new Long(25));
/* Enqueue two messages */
@@ -394,31 +418,36 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to both after enqueue",
+ 2,
+ target1.getMessages().size() + target2.getMessages().size());
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to both subscriptions after release",
+ 3,
+ target1.getMessages().size() + target2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry());
assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
{
+ ServerMessage messageA = createMessage(new Long(24));
// Check adding an exclusive subscription adds it to the queue
- _queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
- _subscription.getQueue());
+
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.EXCLUSIVE));
+
assertEquals("Queue does not have consumer", 1,
- _queue.getConsumerCount());
+ _queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
- _queue.getActiveConsumerCount());
+ _queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
- ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
try
{
@@ -430,11 +459,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- Subscription subB = new MockSubscription();
+ MockSubscription subB = new MockSubscription();
Exception ex = null;
try
{
- _queue.registerSubscription(subB, false);
+
+ _queue.registerSubscription(subB, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
+
}
catch (AMQException e)
{
@@ -445,10 +477,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Check we cannot add an exclusive subscriber to a queue with an
// existing subscription
_queue.unregisterSubscription(_subscription);
- _queue.registerSubscription(_subscription, false);
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
+
try
{
- _queue.registerSubscription(subB, true);
+
+ _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test",
+ EnumSet.of(Subscription.Option.EXCLUSIVE));
+
}
catch (AMQException e)
{
@@ -462,8 +499,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.stop();
_queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
_queue.setDeleteOnNoConsumers(true);
- _queue.registerSubscription(_subscription, false);
- ServerMessage message = createMessage(new Long(25));
+
+ ServerMessage message = createMessage(new Long(25));
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
+
_queue.enqueue(message);
_queue.unregisterSubscription(_subscription);
assertTrue("Queue was not deleted when subscription was removed",
@@ -472,9 +512,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testResend() throws Exception
{
- _queue.registerSubscription(_subscription, false);
Long id = new Long(26);
ServerMessage message = createMessage(id);
+
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Subscription.Option.class));
+
_queue.enqueue(message);
QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
entry.setRedelivered();
@@ -649,18 +692,21 @@ public class SimpleAMQQueueTest extends QpidTestCase
// in.Bias over 50% of the messages to the first subscription so that
// the later subscriptions reject them and report being done before
// the first subscription as the processQueue method proceeds.
- List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
- List<QueueEntry> msgListSub2 = createEntriesList(msg4);
- List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+ List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<String> msgListSub2 = createEntriesList(msg4);
+ List<String> msgListSub3 = createEntriesList(msg5);
MockSubscription sub1 = new MockSubscription(msgListSub1);
MockSubscription sub2 = new MockSubscription(msgListSub2);
MockSubscription sub3 = new MockSubscription(msgListSub3);
// register the subscriptions
- testQueue.registerSubscription(sub1, false);
- testQueue.registerSubscription(sub2, false);
- testQueue.registerSubscription(sub3, false);
+ testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+ testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+ testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
//subscriptions during registration
@@ -680,9 +726,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
});
// check expected messages delivered to correct consumers
- verifyReceivedMessages(msgListSub1, sub1.getMessages());
- verifyReceivedMessages(msgListSub2, sub2.getMessages());
- verifyReceivedMessages(msgListSub3, sub3.getMessages());
+ verifyReceivedMessages(Arrays.asList(msg1,msg2,msg3), sub1.getMessages());
+ verifyReceivedMessages(Collections.singletonList(msg4), sub2.getMessages());
+ verifyReceivedMessages(Collections.singletonList(msg5), sub3.getMessages());
}
/**
@@ -883,7 +929,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
try
{
// subscribe
- testQueue.registerSubscription(subscription, false);
+ testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -907,7 +953,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
Thread.currentThread().interrupt();
}
- List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3));
+ List<QueueEntry> expected = Arrays.asList(entries.get(0), entries.get(2), entries.get(3));
verifyReceivedMessages(expected, subscription.getMessages());
}
@@ -970,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
// register subscription
try
{
- queue.registerSubscription(subscription, false);
+ queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
}
catch (AMQException e)
{
@@ -997,52 +1043,51 @@ public class SimpleAMQQueueTest extends QpidTestCase
//verify adding an active subscription increases the count
final MockSubscription subscription1 = new MockSubscription();
subscription1.setActive(true);
+ subscription1.setState(SubscriptionTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription1, false);
+ queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify adding an inactive subscription doesn't increase the count
final MockSubscription subscription2 = new MockSubscription();
subscription2.setActive(false);
+ subscription2.setState(SubscriptionTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription2, false);
+ queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
//verify a subscription going suspended->active increases the count
- queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ subscription2.setState(SubscriptionTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
//verify a subscription going active->suspended decreases the count
- queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ subscription2.setState(SubscriptionTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going suspended->closed doesn't change the count
- queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a subscription going active->closed decreases the count
- queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- //verify behaviour in face of unexpected state changes:
-
- //verify a subscription going closed->active increases the count
- queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+ subscription2.setState(SubscriptionTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going active->active doesn't change the count
- queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ subscription1.setState(SubscriptionTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going closed->suspended doesn't change the count
- queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
//verify a subscription going suspended->suspended doesn't change the count
- queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+ subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ subscription1.setState(SubscriptionTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->closed decreases the count
+ subscription1.setState(SubscriptionTarget.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
}
public void testNotificationFiredOnEnqueue() throws Exception
@@ -1167,12 +1212,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
return entry;
}
- private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ private List<String> createEntriesList(QueueEntry... entries)
{
- ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ ArrayList<String> entriesList = new ArrayList<String>();
for (QueueEntry entry : entries)
{
- entriesList.add(entry);
+ entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
}
return entriesList;
}
@@ -1197,7 +1242,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
public MockSubscription getSubscription()
{
- return _subscription;
+ return _subscriptionTarget;
}
public Map<String,Object> getArguments()
@@ -1213,14 +1258,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
protected ServerMessage createMessage(Long id) throws AMQException
{
+ AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(header.getMessageId()).thenReturn(String.valueOf(id));
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
+ when(message.getMessageHeader()).thenReturn(header);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
- AMQMessageHeader hdr = mock(AMQMessageHeader.class);
- when(message.getMessageHeader()).thenReturn(hdr);
when(message.newReference()).thenReturn(ref);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index eec1edca35..2e52d47326 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -23,6 +23,10 @@ package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Port;
@@ -42,36 +46,34 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class MockSubscription implements Subscription
+public class MockSubscription implements SubscriptionTarget
{
+ private final List<String> _messageIds;
private boolean _closed = false;
private String tag = "mocktag";
private AMQQueue queue = null;
- private StateChangeListener<Subscription, State> _listener = null;
- private volatile AMQQueue.Context _queueContext = null;
+ private StateChangeListener<SubscriptionTarget, State> _listener = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
- private List<QueueEntry> _acceptEntries = null;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private static final AtomicLong idGenerator = new AtomicLong(0);
// Create a simple ID that increments for ever new Subscription
- private final long _subscriptionID = idGenerator.getAndIncrement();
private boolean _isActive = true;
+ private Subscription _subscription;
public MockSubscription()
{
+ _messageIds = null;
}
- public MockSubscription(List<QueueEntry> acceptEntries)
+ public MockSubscription(List<String> messageIds)
{
- _acceptEntries = acceptEntries;
+ _messageIds = messageIds;
}
- public void close()
+ public boolean close()
{
_closed = true;
if (_listener != null)
@@ -79,6 +81,7 @@ public class MockSubscription implements Subscription
_listener.stateChanged(this, _state, State.CLOSED);
}
_state = State.CLOSED;
+ return true;
}
public String getName()
@@ -86,45 +89,26 @@ public class MockSubscription implements Subscription
return tag;
}
- @Override
- public void flush() throws AMQException
- {
-
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public LogActor getLogActor()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public long getBytesOut()
- {
- return 0; // TODO - Implement
- }
-
- public long getMessagesOut()
+ public FilterManager getFilters()
{
- return 0; // TODO - Implement
+ if(_messageIds != null)
+ {
+ SimpleFilterManager filters = new SimpleFilterManager();
+ filters.add(new MessageFilter()
+ {
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ final String messageId = message.getMessageHeader().getMessageId();
+ return _messageIds.contains(messageId);
+ }
+ });
+ return filters;
+ }
+ else
+ {
+ return null;
+ }
}
public long getUnacknowledgedBytes()
@@ -147,62 +131,18 @@ public class MockSubscription implements Subscription
return new MockSessionModel();
}
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
- if(_acceptEntries != null)
- {
- //simulate selector behaviour, only signal
- //interest in the dictated queue entries
- return _acceptEntries.contains(entry);
- }
-
- return true;
- }
-
public boolean isActive()
{
return _isActive ;
}
- public void set(String key, Object value)
- {
- }
- public Object get(String key)
- {
- return null;
- }
-
- public boolean isAutoClose()
- {
- return false;
- }
public boolean isClosed()
{
return _closed;
}
- public boolean acquires()
- {
- return true;
- }
-
- public boolean seesRequeues()
- {
- return true;
- }
public boolean isSuspended()
{
@@ -213,11 +153,6 @@ public class MockSubscription implements Subscription
{
}
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
public void restoreCredit(QueueEntry queueEntry)
{
}
@@ -236,33 +171,37 @@ public class MockSubscription implements Subscription
}
- public void setQueueContext(AMQQueue.Context queueContext)
+ public State getState()
{
- _queueContext = queueContext;
+ return _state;
}
- public void setQueue(AMQQueue queue, boolean exclusive)
+ @Override
+ public void subscriptionRegistered(final Subscription sub)
{
- this.queue = queue;
+ _subscription = sub;
}
- public void setNoLocal(boolean noLocal)
+ @Override
+ public void subscriptionRemoved(final Subscription sub)
{
- }
- public void setStateListener(StateChangeListener<Subscription, State> listener)
- {
- this._listener = listener;
}
- public State getState()
+ public void setState(State state)
{
- return _state;
+ State oldState = _state;
+ _state = state;
+ if(_listener != null)
+ {
+ _listener.stateChanged(this, oldState, state);
+ }
}
- public boolean wouldSuspend(QueueEntry msg)
+ @Override
+ public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener)
{
- return false;
+ _listener = listener;
}
public ArrayList<QueueEntry> getMessages()
@@ -270,13 +209,15 @@ public class MockSubscription implements Subscription
return messages;
}
- public boolean isSessionTransactional()
+
+ public void queueEmpty() throws AMQException
{
- return false;
}
- public void queueEmpty() throws AMQException
+ @Override
+ public boolean allocateCredit(final QueueEntry msg)
{
+ return true;
}
public void setActive(final boolean isActive)
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java
index cd5b178464..4af28de307 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java
@@ -20,25 +20,29 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNode;
import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNodeIterator;
import org.apache.qpid.test.utils.QpidTestCase;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class SubscriptionListTest extends QpidTestCase
{
private SubscriptionList _subList;
- private MockSubscription _sub1;
- private MockSubscription _sub2;
- private MockSubscription _sub3;
+ private Subscription _sub1;
+ private Subscription _sub2;
+ private Subscription _sub3;
private SubscriptionNode _node;
protected void setUp()
{
_subList = new SubscriptionList();
- _sub1 = new MockSubscription();
- _sub2 = new MockSubscription();
- _sub3 = new MockSubscription();
+ _sub1 = newMockSubscription();
+ _sub2 = newMockSubscription();
+ _sub3 = newMockSubscription();
_subList.add(_sub1);
_subList.add(_sub2);
@@ -47,6 +51,15 @@ public class SubscriptionListTest extends QpidTestCase
_node = _subList.getHead();
}
+
+ private Subscription newMockSubscription()
+ {
+ final Subscription subscription = mock(Subscription.class);
+ when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+ when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+ return subscription;
+ }
+
/**
* Test that if the first (non-head) node in the list is deleted (but is still present),
* it is not returned when searching through the list for the next viable node, and the
@@ -177,9 +190,9 @@ public class SubscriptionListTest extends QpidTestCase
assertEquals("Unexpected size result", 0, subList.size());
- Subscription sub1 = new MockSubscription();
- Subscription sub2 = new MockSubscription();
- Subscription sub3 = new MockSubscription();
+ Subscription sub1 = newMockSubscription();
+ Subscription sub2 = newMockSubscription();
+ Subscription sub3 = newMockSubscription();
subList.add(sub1);
assertEquals("Unexpected size result", 1, subList.size());
@@ -253,7 +266,7 @@ public class SubscriptionListTest extends QpidTestCase
*/
public void testRemoveNonexistentNode()
{
- Subscription sub4 = new MockSubscription();
+ Subscription sub4 = newMockSubscription();
assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4));
assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4));
assertEquals("Unexpected number of nodes", 3, countNodes(_subList));
@@ -324,7 +337,7 @@ public class SubscriptionListTest extends QpidTestCase
assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
//add a new 4th subscription to the list
- Subscription sub4 = new MockSubscription();
+ Subscription sub4 = newMockSubscription();
_subList.add(sub4);
//get the node out the list for the 4th subscription
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d3480e3223..94c6f6aeba 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
@@ -265,16 +266,28 @@ public class ServerSessionDelegate extends SessionDelegate
filterManager,
method.getArguments());
- Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class,
- method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED,
- method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target);
-
- target.setSubscription(sub);
-
((ServerSession)session).register(destination, target);
try
{
- queue.registerSubscription(sub, method.getExclusive());
+ EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ options.add(Subscription.Option.ACQUIRES);
+ }
+ if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
+ if(method.getExclusive())
+ {
+ options.add(Subscription.Option.EXCLUSIVE);
+ }
+ Subscription sub =
+ queue.registerSubscription(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options);
}
catch (AMQQueue.ExistingExclusiveSubscription existing)
{
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
index c151eddb46..4695f5b04c 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
@@ -93,11 +93,6 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
- public void setSubscription(Subscription subscription)
- {
- _subscription = subscription;
- }
-
public Subscription getSubscription()
{
return _subscription;
@@ -570,6 +565,17 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
+ @Override
+ public void subscriptionRegistered(final Subscription sub)
+ {
+ _subscription = sub;
+ }
+
+ @Override
+ public void subscriptionRemoved(final Subscription sub)
+ {
+ }
+
public long getUnacknowledgedBytes()
{
return _unacknowledgedBytes.longValue();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 9df1e7b89b..7e8623c171 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -21,19 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -81,6 +71,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -123,7 +114,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+ private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>();
private final MessageStore _messageStore;
@@ -498,9 +489,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- public Subscription getSubscription(AMQShortString subscription)
+ public Subscription getSubscription(AMQShortString tag)
{
- return _tag2SubscriptionMap.get(subscription);
+ final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getSubscription();
}
/**
@@ -526,34 +518,57 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_tag2SubscriptionMap.containsKey(tag))
+ if (_tag2SubscriptionTargetMap.containsKey(tag))
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- Subscription subscription =
- SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+ SubscriptionTarget_0_8 target;
+ EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ {
+ target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ options.add(Subscription.Option.TRANSIENT);
+ }
+ else if(acks)
+ {
+ target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
+ else
+ {
+ target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
+
+ if(exclusive)
+ {
+ options.add(Subscription.Option.EXCLUSIVE);
+ }
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _tag2SubscriptionMap.put(tag, subscription);
+ _tag2SubscriptionTargetMap.put(tag, target);
try
{
- queue.registerSubscription(subscription, exclusive);
+ Subscription sub =
+ queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options);
}
catch (AMQException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
catch (RuntimeException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
return tag;
@@ -568,7 +583,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Subscription sub = target == null ? null : target.getSubscription();
if (sub != null)
{
try
@@ -634,7 +650,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
if (_logger.isInfoEnabled())
{
- if (!_tag2SubscriptionMap.isEmpty())
+ if (!_tag2SubscriptionTargetMap.isEmpty())
{
_logger.info("Unsubscribing all consumers on channel " + toString());
}
@@ -644,14 +660,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+ for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue();
+ Subscription sub = me.getValue().getSubscription();
try
{
@@ -665,7 +681,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- _tag2SubscriptionMap.clear();
+ _tag2SubscriptionTargetMap.clear();
}
/**
@@ -977,9 +993,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getQueue().deliverAsync(s);
+ s.getSubscription().getQueue().deliverAsync(s.getSubscription());
}
}
@@ -993,15 +1009,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (!wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSendLock();
+ s.getSubscription().getSendLock();
}
finally
{
- s.releaseSendLock();
+ s.getSubscription().releaseSendLock();
}
}
}
@@ -1078,10 +1094,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSendLock();
- sub.releaseSendLock();
+ sub.getSubscription().getSendLock();
+ sub.getSubscription().releaseSendLock();
}
try
@@ -1116,9 +1132,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(requiresSuspend)
{
_suspended.set(false);
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getQueue().deliverAsync(sub);
+ sub.getSubscription().getQueue().deliverAsync(sub.getSubscription());
}
}
@@ -1672,6 +1688,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
@Override
public int getConsumerCount()
{
- return _tag2SubscriptionMap.size();
+ return _tag2SubscriptionTargetMap.size();
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
deleted file mode 100644
index 6646dc0cc2..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-
-/**
- * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
- * primarily assists testing although in future more sophisticated subscribers may need a different subscription
- * implementation.
- *
- * @see org.apache.qpid.server.queue.AMQQueue
- */
-public interface SubscriptionFactory
-{
- Subscription createSubscription(int channel,
- AMQProtocolSession protocolSession,
- AMQShortString consumerTag,
- boolean acks,
- FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager) throws AMQException;
-
-
- Subscription createSubscription(AMQChannel channel,
- AMQProtocolSession protocolSession,
- AMQShortString consumerTag,
- boolean acks,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod clientMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
-
-
- Subscription createBasicGetNoAckSubscription(AMQChannel channel,
- AMQProtocolSession session,
- AMQShortString consumerTag,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
deleted file mode 100644
index 05f35748ee..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.DelegatingSubscription;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
-
-public class SubscriptionFactoryImpl implements SubscriptionFactory
-{
-
- public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, boolean acks, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager) throws AMQException
- {
- AMQChannel channel = protocolSession.getChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
- }
- ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod();
- RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod();
-
-
- return createSubscription(channel, protocolSession, consumerTag, acks, filters,
- noLocal,
- creditManager,
- clientMethod,
- recordMethod
- );
- }
-
-
- public Subscription createSubscription(final AMQChannel channel,
- final AMQProtocolSession protocolSession,
- final AMQShortString consumerTag,
- final boolean acks,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod clientMethod,
- final RecordDeliveryMethod recordMethod
- )
- throws AMQException
- {
- boolean isBrowser;
- SubscriptionTarget_0_8 target;
- Subscription subscription;
-
- if (filters != null)
- {
- Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
- isBrowser = (isBrowserObj != null) && isBrowserObj.booleanValue();
- }
- else
- {
- isBrowser = false;
- }
-
- final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
- boolean acquires;
- boolean seesReuques;
- boolean isTransient;
- if(isBrowser)
- {
- target = new SubscriptionTarget_0_8.BrowserSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod);
- acquires = false;
- seesReuques = false;
- isTransient = true;
- }
- else if(acks)
- {
- target = new SubscriptionTarget_0_8.AckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod);
- acquires = true;
- seesReuques = true;
- isTransient = false;
- }
- else
- {
- target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod);
- acquires = true;
- seesReuques = true;
- isTransient = false;
- }
- subscription =
- new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, acquires, seesReuques, AMQShortString.toString(consumerTag),isTransient,target);
- target.setSubscription(subscription);
- return subscription;
- }
-
-
-
- public Subscription createBasicGetNoAckSubscription(final AMQChannel channel,
- final AMQProtocolSession session,
- final AMQShortString consumerTag,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod deliveryMethod,
- final RecordDeliveryMethod recordMethod) throws AMQException
- {
- SubscriptionTarget_0_8 target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
-
- Subscription subscription;
- final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
- subscription =
- new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, true, true, AMQShortString.toString(consumerTag),true,target);
- target.setSubscription(subscription);
- return subscription;
- }
-
- public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
index 6c91e6130e..f7acad87a1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
@@ -76,6 +76,13 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
private Subscription _subscription;
+ public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
+ {
+ return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ }
+
static final class BrowserSubscription extends SubscriptionTarget_0_8
{
public BrowserSubscription(AMQChannel channel,
@@ -120,6 +127,22 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
+ public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
+ {
+ return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ }
+
+ public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
public static class NoAckSubscription extends SubscriptionTarget_0_8
{
private final AutoCommitTransaction _txn;
@@ -220,6 +243,26 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
+
+ public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager)
+ throws AMQException
+ {
+ return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ }
+
+
+ public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+ }
+
static final class AckSubscription extends SubscriptionTarget_0_8
{
public AckSubscription(AMQChannel channel,
@@ -317,14 +360,20 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
}
- public void setSubscription(Subscription subscription)
+ public Subscription getSubscription()
{
- _subscription = subscription;
+ return _subscription;
}
- public Subscription getSubscription()
+ @Override
+ public void subscriptionRemoved(final Subscription sub)
{
- return _subscription;
+ }
+
+ @Override
+ public void subscriptionRegistered(final Subscription sub)
+ {
+ _subscription = sub;
}
public AMQSessionModel getSessionModel()
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index a48ae3826e..d5f7aad93d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
@@ -33,8 +34,10 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -42,9 +45,10 @@ import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.EnumSet;
+
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -151,17 +155,24 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
};
- Subscription sub;
+ SubscriptionTarget_0_8 target;
+ EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES,
+ Subscription.Option.SEES_REQUEUES);
if(acks)
{
- sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+ target = SubscriptionTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = SubscriptionTarget_0_8.createNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- queue.registerSubscription(sub,false);
+ Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options);
sub.flush();
queue.unregisterSubscription(sub);
return(!singleMessageCredit.hasCredit());
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 2243cbff11..edb60a0c6c 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Set;
/**
@@ -47,6 +48,7 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
+ private SubscriptionTarget_0_8 _subscriptionTarget;
private Subscription _subscription;
private AMQProtocolSession _protocolSession;
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
- _queue.registerSubscription(_subscription,false);
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -178,7 +179,10 @@ public class AckTest extends QpidTestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -202,7 +206,16 @@ public class AckTest extends QpidTestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +231,13 @@ public class AckTest extends QpidTestCase
public void testPersistentNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -235,7 +254,15 @@ public class AckTest extends QpidTestCase
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -264,7 +291,15 @@ public class AckTest extends QpidTestCase
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -290,7 +325,15 @@ public class AckTest extends QpidTestCase
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -319,8 +362,12 @@ public class AckTest extends QpidTestCase
// Send 10 messages
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
- DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 1;
publishMessages(msgCount);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index 36a57fa05f..88702e66f4 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -37,6 +37,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
*
@@ -105,9 +108,11 @@ public class ExtractResendAndRequeueTest extends TestCase
*/
private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
{
- Subscription subscription = new MockSubscription();
+ Subscription subscription = mock(Subscription.class);
+ when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+ when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
- // Aquire messages in subscription
+ // Acquire messages in subscription
for (QueueEntry entry : messageList)
{
entry.acquire(subscription);
@@ -157,7 +162,7 @@ public class ExtractResendAndRequeueTest extends TestCase
Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
// Close subscription
- subscription.close();
+ when(subscription.isClosed()).thenReturn(true);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
deleted file mode 100644
index e0d1b28007..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class SubscriptionFactoryImplTest extends QpidTestCase
-{
- private AMQChannel _channel;
- private AMQProtocolSession _session;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _channel = BrokerTestHelper_0_8.createChannel();
- _session = _channel.getProtocolSession();
- GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false));
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.getVirtualHost().close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- /**
- * Tests that while creating Subscriptions of various types, the
- * ID numbers assigned are allocated from a common sequence
- * (in increasing order).
- */
- public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception
- {
- //create a No-Ack subscription, get the first Subscription ID
- long previousId = 0;
- Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager());
- previousId = noAckSub.getSubscriptionID();
-
- //create an ack subscription, verify the next Subscription ID is used
- Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID());
- previousId = ackSub.getSubscriptionID();
-
- //create a browser subscription
- FieldTable filters = new FieldTable();
- filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- Subscription browserSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, browserSub.getSubscriptionID());
- previousId = browserSub.getSubscriptionID();
-
- //create an BasicGet NoAck subscription
- Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false,
- _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID());
- previousId = getNoAckSub.getSubscriptionID();
-
- }
-
-}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index c590a18823..264c662364 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,6 +111,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
QueueDestination qd = null;
AMQQueue queue = null;
+ EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
boolean noLocal = false;
@@ -173,14 +175,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
_target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
- _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class,
- source.getDistributionMode() != StdDistMode.COPY,
- source.getDistributionMode() != StdDistMode.COPY,
- getEndpoint().getName(),
- false,
- _target);
- _target.setSubscription(_subscription);
+ if(source.getDistributionMode() != StdDistMode.COPY)
+ {
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
}
else if(destination instanceof ExchangeDestination)
@@ -373,26 +372,27 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_target = new SubscriptionTarget_1_0(this, true);
- _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class,
- true,
- true,
- getEndpoint().getName(),
- false,
- _target);
- _target.setSubscription(_subscription);
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
}
- if(_subscription != null)
+ if(_target != null)
{
+ if(noLocal)
+ {
+ options.add(Subscription.Option.NO_LOCAL);
+ }
+
+
_subscription.setNoLocal(noLocal);
try
{
-
- queue.registerSubscription(_subscription, false);
+ _subscription = queue.registerSubscription(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, getEndpoint().getName(), options);
}
catch (AMQException e)
{
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
index ebff2d2ee7..55c8407ea5 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
@@ -74,11 +74,6 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
_acquires = acquires;
}
- public void setSubscription(Subscription sub)
- {
- _subscription = sub;
- }
-
public Subscription getSubscription()
{
return _subscription;
@@ -505,6 +500,18 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
}
@Override
+ public void subscriptionRegistered(final Subscription sub)
+ {
+ _subscription = sub;
+ }
+
+ @Override
+ public void subscriptionRemoved(final Subscription sub)
+ {
+
+ }
+
+ @Override
public long getUnacknowledgedBytes()
{
// TODO