diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-03 23:28:12 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-03 23:28:12 +0000 |
commit | 7ec70e560ee9a446a060d168b04a30ebc305c75c (patch) | |
tree | bd8aeefe542b432c6a37950e35ca4ca6a0baeeb6 | |
parent | 7a690b854f6492e4d517320598d93f4d30081e28 (diff) | |
download | qpid-python-7ec70e560ee9a446a060d168b04a30ebc305c75c.tar.gz |
Change subscription registration for queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564130 13f79535-47bb-0310-9956-ffa450edef68
25 files changed, 554 insertions, 641 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f6602c8071..d822705a39 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -25,15 +25,19 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -84,7 +88,9 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa VirtualHost getVirtualHost(); - void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException; + Subscription registerSubscription(final SubscriptionTarget target, final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, EnumSet<Subscription.Option> options) throws AMQException; void unregisterSubscription(final Subscription subscription) throws AMQException; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 5f79498beb..d73bd638b8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,15 +18,7 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -43,6 +35,7 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -55,9 +48,11 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager; import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager; +import org.apache.qpid.server.subscription.DelegatingSubscription; import org.apache.qpid.server.subscription.MessageGroupManager; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -390,7 +385,25 @@ public class SimpleAMQQueue implements AMQQueue, // ------ Manage Subscriptions - public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) + + @Override + public Subscription registerSubscription(final SubscriptionTarget target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<Subscription.Option> optionSet) throws AMQException + { + + DelegatingSubscription sub = new DelegatingSubscription(filters, messageClass, + optionSet.contains(Subscription.Option.ACQUIRES), + optionSet.contains(Subscription.Option.SEES_REQUEUES), + consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target); + target.subscriptionRegistered(sub); + registerSubscription(sub, optionSet.contains(Subscription.Option.EXCLUSIVE)); + return sub; + } + + private synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive { // Access control @@ -479,6 +492,7 @@ public class SimpleAMQQueue implements AMQQueue, setExclusiveSubscriber(null); subscription.setQueueContext(null); + if(_messageGroupManager != null) { resetSubPointersForGroups(subscription, true); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java index 213d8b7730..9b97f87d04 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java @@ -178,7 +178,7 @@ public abstract class AbstractSubscription implements Subscription else { // no interest in messages we can't convert - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null) + if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null) { return false; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java index 623371c84c..602cabcf75 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java @@ -143,6 +143,7 @@ public class DelegatingSubscription<T extends SubscriptionTarget> extends Abstra public void close() { _target.close(); + _target.subscriptionRemoved(this); } @Override diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java index f9278cf719..03a56f7dc2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -32,6 +32,15 @@ public interface Subscription { AtomicLong SUB_ID_GENERATOR = new AtomicLong(0); + enum Option + { + ACQUIRES, + SEES_REQUEUES, + TRANSIENT, + EXCLUSIVE, + NO_LOCAL + } + LogActor getLogActor(); boolean isTransient(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java index 0b0be38f42..bdcd0c15fb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java @@ -36,6 +36,10 @@ public interface SubscriptionTarget State getState(); + void subscriptionRegistered(Subscription sub); + + void subscriptionRemoved(Subscription sub); + void setStateListener(StateChangeListener<SubscriptionTarget, State> listener); long getUnacknowledgedBytes(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java index a0a2a7b648..6b59f65940 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java @@ -20,11 +20,17 @@ */ package org.apache.qpid.server.logging.actors; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.BrokerTestHelper; import java.util.List; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Test : AMQPConnectionActorTest * Validate the AMQPConnectionActor class. @@ -42,9 +48,11 @@ public class SubscriptionActorTest extends BaseConnectionActorTestCase { super.setUp(); - MockSubscription mockSubscription = new MockSubscription(); + Subscription mockSubscription = mock(Subscription.class); + final AMQQueue queue = BrokerTestHelper.createQueue(getName(), getVirtualHost()); + when(mockSubscription.getQueue()).thenReturn(queue); + when(mockSubscription.getSubscriptionID()).thenReturn(0l); - mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getVirtualHost()), false); setAmqpActor(new SubscriptionActor(getRootLogger(), mockSubscription)); } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index a468fa072b..1947a4e3b6 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -28,7 +28,10 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import java.util.ArrayList; +import java.util.EnumSet; + import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.subscription.Subscription; import static org.mockito.Mockito.when; @@ -62,7 +65,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest queue.enqueue(createMessage(9L, (byte) 0)); // Register subscriber - queue.registerSubscription(getSubscription(), false); + queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); ArrayList<QueueEntry> msgs = getSubscription().getMessages(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 98ecdcdd3b..e8b644045a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -24,16 +24,20 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.subscription.DelegatingSubscription; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.UUID; @@ -203,16 +207,24 @@ public class MockAMQQueue implements AMQQueue return _virtualhost; } - public String getName() + @Override + public Subscription registerSubscription(final SubscriptionTarget target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + final EnumSet<Subscription.Option> options) throws AMQException { - return _name; + return new DelegatingSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES), + options.contains(Subscription.Option.SEES_REQUEUES), consumerName, + options.contains(Subscription.Option.TRANSIENT), target ); } - public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException + public String getName() { - + return _name; } + public void unregisterSubscription(Subscription subscription) throws AMQException { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 2b1e7f5e1f..2753a89bf4 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -113,11 +113,19 @@ public abstract class QueueEntryImplTestBase extends TestCase */ private void acquire() { - _queueEntry.acquire(new MockSubscription()); + _queueEntry.acquire(newMockSubscription()); assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", _queueEntry.isAcquired()); } + private Subscription newMockSubscription() + { + final Subscription subscription = mock(Subscription.class); + when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription)); + when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); + return subscription; + } + /** * A helper method to get entry state * @@ -145,7 +153,7 @@ public abstract class QueueEntryImplTestBase extends TestCase */ public void testRejectAndRejectedBy() { - Subscription sub = new MockSubscription(); + Subscription sub = newMockSubscription(); long subId = sub.getSubscriptionID(); assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); @@ -160,7 +168,7 @@ public abstract class QueueEntryImplTestBase extends TestCase assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); //repeat rejection using a second subscription - Subscription sub2 = new MockSubscription(); + Subscription sub2 = newMockSubscription(); long sub2Id = sub2.getSubscriptionID(); assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index ffd64774c0..3bb7102d2a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -29,6 +29,8 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.EnumSet; import java.util.Map; import org.apache.log4j.Logger; @@ -44,6 +46,7 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -65,7 +68,8 @@ public class SimpleAMQQueueTest extends QpidTestCase private String _owner = "owner"; private String _routingKey = "routing key"; private DirectExchange _exchange; - private MockSubscription _subscription = new MockSubscription(); + private MockSubscription _subscriptionTarget = new MockSubscription(); + private Subscription _subscription; private Map<String,Object> _arguments = null; @Override @@ -159,17 +163,17 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException { + ServerMessage messageA = createMessage(new Long(24)); + // Check adding a subscription adds it to the queue - _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); + _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); try { @@ -179,14 +183,14 @@ public class SimpleAMQQueueTest extends QpidTestCase { } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull(((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); - assertTrue("Subscription still had queue", _subscription.isClosed()); + assertTrue("Subscription still had queue", _subscriptionTarget.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); assertFalse("Queue still has active consumer", - 1 == _queue.getActiveConsumerCount()); + 1 == _queue.getActiveConsumerCount()); ServerMessage messageB = createMessage(new Long (25)); _queue.enqueue(messageB); @@ -198,10 +202,11 @@ public class SimpleAMQQueueTest extends QpidTestCase { ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); - _queue.registerSubscription(_subscription, false); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("There should be no releasedEntry after an enqueue", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); } /** @@ -213,10 +218,11 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage messageB = createMessage(new Long(25)); _queue.enqueue(messageA); _queue.enqueue(messageB); - _queue.registerSubscription(_subscription, false); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("There should be no releasedEntry after enqueues", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); } /** @@ -225,7 +231,15 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleasedMessageIsResentToSubscriber() throws Exception { - _queue.registerSubscription(_subscription, false); + + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + ServerMessage messageC = createMessage(new Long(26)); + + + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, + Subscription.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -236,10 +250,6 @@ public class SimpleAMQQueueTest extends QpidTestCase } }; - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); - ServerMessage messageC = createMessage(new Long(26)); - /* Enqueue three messages */ _queue.enqueue(messageA, postEnqueueAction); @@ -248,7 +258,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -259,11 +269,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); } /** @@ -273,7 +283,11 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception { - _queue.registerSubscription(_subscription, false); + ServerMessage messageA = createMessage(new Long(24)); + + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -286,7 +300,6 @@ public class SimpleAMQQueueTest extends QpidTestCase /* Enqueue one message with expiration set for a short time in the future */ - ServerMessage messageA = createMessage(new Long(24)); int messageExpirationOffset = 200; final long expiration = System.currentTimeMillis() + messageExpirationOffset; when(messageA.getExpiration()).thenReturn(expiration); @@ -296,7 +309,7 @@ public class SimpleAMQQueueTest extends QpidTestCase int subFlushWaitTime = 150; Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ @@ -306,9 +319,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); - assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); + assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); } @@ -320,7 +333,14 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception { - _queue.registerSubscription(_subscription, false); + + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + ServerMessage messageC = createMessage(new Long(26)); + + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, + Subscription.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -331,10 +351,6 @@ public class SimpleAMQQueueTest extends QpidTestCase } }; - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); - ServerMessage messageC = createMessage(new Long(26)); - /* Enqueue three messages */ _queue.enqueue(messageA, postEnqueueAction); @@ -343,7 +359,7 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -355,11 +371,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); + assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); } @@ -369,11 +385,21 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testReleaseForQueueWithMultipleSubscriptions() throws Exception { - MockSubscription subscription1 = new MockSubscription(); - MockSubscription subscription2 = new MockSubscription(); + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + + MockSubscription target1 = new MockSubscription(); + MockSubscription target2 = new MockSubscription(); + + + Subscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, + Subscription.Option.SEES_REQUEUES)); + + Subscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, + Subscription.Option.SEES_REQUEUES)); - _queue.registerSubscription(subscription1, false); - _queue.registerSubscription(subscription2, false); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -384,8 +410,6 @@ public class SimpleAMQQueueTest extends QpidTestCase } }; - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); /* Enqueue two messages */ @@ -394,31 +418,36 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both after enqueue", + 2, + target1.getMessages().size() + target2.getMessages().size()); /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both subscriptions after release", + 3, + target1.getMessages().size() + target2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException { + ServerMessage messageA = createMessage(new Long(24)); // Check adding an exclusive subscription adds it to the queue - _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, - _subscription.getQueue()); + + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.EXCLUSIVE)); + assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); + _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); + _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber - ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); try { @@ -430,11 +459,14 @@ public class SimpleAMQQueueTest extends QpidTestCase assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue - Subscription subB = new MockSubscription(); + MockSubscription subB = new MockSubscription(); Exception ex = null; try { - _queue.registerSubscription(subB, false); + + _queue.registerSubscription(subB, null, messageA.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); + } catch (AMQException e) { @@ -445,10 +477,15 @@ public class SimpleAMQQueueTest extends QpidTestCase // Check we cannot add an exclusive subscriber to a queue with an // existing subscription _queue.unregisterSubscription(_subscription); - _queue.registerSubscription(_subscription, false); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); + try { - _queue.registerSubscription(subB, true); + + _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test", + EnumSet.of(Subscription.Option.EXCLUSIVE)); + } catch (AMQException e) { @@ -462,8 +499,11 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.stop(); _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); - _queue.registerSubscription(_subscription, false); - ServerMessage message = createMessage(new Long(25)); + + ServerMessage message = createMessage(new Long(25)); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); + _queue.enqueue(message); _queue.unregisterSubscription(_subscription); assertTrue("Queue was not deleted when subscription was removed", @@ -472,9 +512,12 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testResend() throws Exception { - _queue.registerSubscription(_subscription, false); Long id = new Long(26); ServerMessage message = createMessage(id); + + _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test", + EnumSet.noneOf(Subscription.Option.class)); + _queue.enqueue(message); QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); entry.setRedelivered(); @@ -649,18 +692,21 @@ public class SimpleAMQQueueTest extends QpidTestCase // in.Bias over 50% of the messages to the first subscription so that // the later subscriptions reject them and report being done before // the first subscription as the processQueue method proceeds. - List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3); - List<QueueEntry> msgListSub2 = createEntriesList(msg4); - List<QueueEntry> msgListSub3 = createEntriesList(msg5); + List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3); + List<String> msgListSub2 = createEntriesList(msg4); + List<String> msgListSub3 = createEntriesList(msg5); MockSubscription sub1 = new MockSubscription(msgListSub1); MockSubscription sub2 = new MockSubscription(msgListSub2); MockSubscription sub3 = new MockSubscription(msgListSub3); // register the subscriptions - testQueue.registerSubscription(sub1, false); - testQueue.registerSubscription(sub2, false); - testQueue.registerSubscription(sub3, false); + testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); + testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); + testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); //check that no messages have been delivered to the //subscriptions during registration @@ -680,9 +726,9 @@ public class SimpleAMQQueueTest extends QpidTestCase }); // check expected messages delivered to correct consumers - verifyReceivedMessages(msgListSub1, sub1.getMessages()); - verifyReceivedMessages(msgListSub2, sub2.getMessages()); - verifyReceivedMessages(msgListSub3, sub3.getMessages()); + verifyReceivedMessages(Arrays.asList(msg1,msg2,msg3), sub1.getMessages()); + verifyReceivedMessages(Collections.singletonList(msg4), sub2.getMessages()); + verifyReceivedMessages(Collections.singletonList(msg5), sub3.getMessages()); } /** @@ -883,7 +929,7 @@ public class SimpleAMQQueueTest extends QpidTestCase try { // subscribe - testQueue.registerSubscription(subscription, false); + testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -907,7 +953,7 @@ public class SimpleAMQQueueTest extends QpidTestCase { Thread.currentThread().interrupt(); } - List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); + List<QueueEntry> expected = Arrays.asList(entries.get(0), entries.get(2), entries.get(3)); verifyReceivedMessages(expected, subscription.getMessages()); } @@ -970,7 +1016,7 @@ public class SimpleAMQQueueTest extends QpidTestCase // register subscription try { - queue.registerSubscription(subscription, false); + queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); } catch (AMQException e) { @@ -997,52 +1043,51 @@ public class SimpleAMQQueueTest extends QpidTestCase //verify adding an active subscription increases the count final MockSubscription subscription1 = new MockSubscription(); subscription1.setActive(true); + subscription1.setState(SubscriptionTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription1, false); + queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify adding an inactive subscription doesn't increase the count final MockSubscription subscription2 = new MockSubscription(); subscription2.setActive(false); + subscription2.setState(SubscriptionTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription2, false); + queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: //verify a subscription going suspended->active increases the count - queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE); + subscription2.setState(SubscriptionTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); //verify a subscription going active->suspended decreases the count - queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED); + subscription2.setState(SubscriptionTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going suspended->closed doesn't change the count - queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - //verify a subscription going active->closed decreases the count - queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED); - assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - - //verify behaviour in face of unexpected state changes: - - //verify a subscription going closed->active increases the count - queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE); + subscription2.setState(SubscriptionTarget.State.CLOSED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify a subscription going active->active doesn't change the count - queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE); + subscription1.setState(SubscriptionTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going closed->suspended doesn't change the count - queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + subscription1.setState(SubscriptionTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); //verify a subscription going suspended->suspended doesn't change the count - queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED); + subscription1.setState(SubscriptionTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + + subscription1.setState(SubscriptionTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a subscription going active->closed decreases the count + subscription1.setState(SubscriptionTarget.State.CLOSED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + } public void testNotificationFiredOnEnqueue() throws Exception @@ -1167,12 +1212,12 @@ public class SimpleAMQQueueTest extends QpidTestCase return entry; } - private List<QueueEntry> createEntriesList(QueueEntry... entries) + private List<String> createEntriesList(QueueEntry... entries) { - ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); + ArrayList<String> entriesList = new ArrayList<String>(); for (QueueEntry entry : entries) { - entriesList.add(entry); + entriesList.add(entry.getMessage().getMessageHeader().getMessageId()); } return entriesList; } @@ -1197,7 +1242,7 @@ public class SimpleAMQQueueTest extends QpidTestCase public MockSubscription getSubscription() { - return _subscription; + return _subscriptionTarget; } public Map<String,Object> getArguments() @@ -1213,14 +1258,15 @@ public class SimpleAMQQueueTest extends QpidTestCase protected ServerMessage createMessage(Long id) throws AMQException { + AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getMessageId()).thenReturn(String.valueOf(id)); ServerMessage message = mock(ServerMessage.class); when(message.getMessageNumber()).thenReturn(id); + when(message.getMessageHeader()).thenReturn(header); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); - AMQMessageHeader hdr = mock(AMQMessageHeader.class); - when(message.getMessageHeader()).thenReturn(hdr); when(message.newReference()).thenReturn(ref); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index eec1edca35..2e52d47326 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Port; @@ -42,36 +46,34 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class MockSubscription implements Subscription +public class MockSubscription implements SubscriptionTarget { + private final List<String> _messageIds; private boolean _closed = false; private String tag = "mocktag"; private AMQQueue queue = null; - private StateChangeListener<Subscription, State> _listener = null; - private volatile AMQQueue.Context _queueContext = null; + private StateChangeListener<SubscriptionTarget, State> _listener = null; private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); - private List<QueueEntry> _acceptEntries = null; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription - private final long _subscriptionID = idGenerator.getAndIncrement(); private boolean _isActive = true; + private Subscription _subscription; public MockSubscription() { + _messageIds = null; } - public MockSubscription(List<QueueEntry> acceptEntries) + public MockSubscription(List<String> messageIds) { - _acceptEntries = acceptEntries; + _messageIds = messageIds; } - public void close() + public boolean close() { _closed = true; if (_listener != null) @@ -79,6 +81,7 @@ public class MockSubscription implements Subscription _listener.stateChanged(this, _state, State.CLOSED); } _state = State.CLOSED; + return true; } public String getName() @@ -86,45 +89,26 @@ public class MockSubscription implements Subscription return tag; } - @Override - public void flush() throws AMQException - { - - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public LogActor getLogActor() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isTransient() - { - return false; - } - - public long getBytesOut() - { - return 0; // TODO - Implement - } - - public long getMessagesOut() + public FilterManager getFilters() { - return 0; // TODO - Implement + if(_messageIds != null) + { + SimpleFilterManager filters = new SimpleFilterManager(); + filters.add(new MessageFilter() + { + @Override + public boolean matches(final Filterable message) + { + final String messageId = message.getMessageHeader().getMessageId(); + return _messageIds.contains(messageId); + } + }); + return filters; + } + else + { + return null; + } } public long getUnacknowledgedBytes() @@ -147,62 +131,18 @@ public class MockSubscription implements Subscription return new MockSessionModel(); } - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public boolean hasInterest(QueueEntry entry) - { - if(_acceptEntries != null) - { - //simulate selector behaviour, only signal - //interest in the dictated queue entries - return _acceptEntries.contains(entry); - } - - return true; - } - public boolean isActive() { return _isActive ; } - public void set(String key, Object value) - { - } - public Object get(String key) - { - return null; - } - - public boolean isAutoClose() - { - return false; - } public boolean isClosed() { return _closed; } - public boolean acquires() - { - return true; - } - - public boolean seesRequeues() - { - return true; - } public boolean isSuspended() { @@ -213,11 +153,6 @@ public class MockSubscription implements Subscription { } - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - public void restoreCredit(QueueEntry queueEntry) { } @@ -236,33 +171,37 @@ public class MockSubscription implements Subscription } - public void setQueueContext(AMQQueue.Context queueContext) + public State getState() { - _queueContext = queueContext; + return _state; } - public void setQueue(AMQQueue queue, boolean exclusive) + @Override + public void subscriptionRegistered(final Subscription sub) { - this.queue = queue; + _subscription = sub; } - public void setNoLocal(boolean noLocal) + @Override + public void subscriptionRemoved(final Subscription sub) { - } - public void setStateListener(StateChangeListener<Subscription, State> listener) - { - this._listener = listener; } - public State getState() + public void setState(State state) { - return _state; + State oldState = _state; + _state = state; + if(_listener != null) + { + _listener.stateChanged(this, oldState, state); + } } - public boolean wouldSuspend(QueueEntry msg) + @Override + public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener) { - return false; + _listener = listener; } public ArrayList<QueueEntry> getMessages() @@ -270,13 +209,15 @@ public class MockSubscription implements Subscription return messages; } - public boolean isSessionTransactional() + + public void queueEmpty() throws AMQException { - return false; } - public void queueEmpty() throws AMQException + @Override + public boolean allocateCredit(final QueueEntry msg) { + return true; } public void setActive(final boolean isActive) diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java index cd5b178464..4af28de307 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java @@ -20,25 +20,29 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNode; import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNodeIterator; import org.apache.qpid.test.utils.QpidTestCase; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class SubscriptionListTest extends QpidTestCase { private SubscriptionList _subList; - private MockSubscription _sub1; - private MockSubscription _sub2; - private MockSubscription _sub3; + private Subscription _sub1; + private Subscription _sub2; + private Subscription _sub3; private SubscriptionNode _node; protected void setUp() { _subList = new SubscriptionList(); - _sub1 = new MockSubscription(); - _sub2 = new MockSubscription(); - _sub3 = new MockSubscription(); + _sub1 = newMockSubscription(); + _sub2 = newMockSubscription(); + _sub3 = newMockSubscription(); _subList.add(_sub1); _subList.add(_sub2); @@ -47,6 +51,15 @@ public class SubscriptionListTest extends QpidTestCase _node = _subList.getHead(); } + + private Subscription newMockSubscription() + { + final Subscription subscription = mock(Subscription.class); + when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription)); + when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); + return subscription; + } + /** * Test that if the first (non-head) node in the list is deleted (but is still present), * it is not returned when searching through the list for the next viable node, and the @@ -177,9 +190,9 @@ public class SubscriptionListTest extends QpidTestCase assertEquals("Unexpected size result", 0, subList.size()); - Subscription sub1 = new MockSubscription(); - Subscription sub2 = new MockSubscription(); - Subscription sub3 = new MockSubscription(); + Subscription sub1 = newMockSubscription(); + Subscription sub2 = newMockSubscription(); + Subscription sub3 = newMockSubscription(); subList.add(sub1); assertEquals("Unexpected size result", 1, subList.size()); @@ -253,7 +266,7 @@ public class SubscriptionListTest extends QpidTestCase */ public void testRemoveNonexistentNode() { - Subscription sub4 = new MockSubscription(); + Subscription sub4 = newMockSubscription(); assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4)); assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4)); assertEquals("Unexpected number of nodes", 3, countNodes(_subList)); @@ -324,7 +337,7 @@ public class SubscriptionListTest extends QpidTestCase assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); //add a new 4th subscription to the list - Subscription sub4 = new MockSubscription(); + Subscription sub4 = newMockSubscription(); _subList.add(sub4); //get the node out the list for the 4th subscription diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d3480e3223..94c6f6aeba 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; @@ -265,16 +266,28 @@ public class ServerSessionDelegate extends SessionDelegate filterManager, method.getArguments()); - Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class, - method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED, - method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target); - - target.setSubscription(sub); - ((ServerSession)session).register(destination, target); try { - queue.registerSubscription(sub, method.getExclusive()); + EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) + { + options.add(Subscription.Option.ACQUIRES); + } + if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + options.add(Subscription.Option.SEES_REQUEUES); + } + if(method.getExclusive()) + { + options.add(Subscription.Option.EXCLUSIVE); + } + Subscription sub = + queue.registerSubscription(target, + filterManager, + MessageTransferMessage.class, + destination, + options); } catch (AMQQueue.ExistingExclusiveSubscription existing) { diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java index c151eddb46..4695f5b04c 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java @@ -93,11 +93,6 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } - public void setSubscription(Subscription subscription) - { - _subscription = subscription; - } - public Subscription getSubscription() { return _subscription; @@ -570,6 +565,17 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } + @Override + public void subscriptionRegistered(final Subscription sub) + { + _subscription = sub; + } + + @Override + public void subscriptionRemoved(final Subscription sub) + { + } + public long getUnacknowledgedBytes() { return _unacknowledgedBytes.longValue(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 9df1e7b89b..7e8623c171 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -21,19 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -42,6 +30,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -55,6 +44,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; @@ -81,6 +71,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -123,7 +114,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); + private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>(); private final MessageStore _messageStore; @@ -498,9 +489,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - public Subscription getSubscription(AMQShortString subscription) + public Subscription getSubscription(AMQShortString tag) { - return _tag2SubscriptionMap.get(subscription); + final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); + return target == null ? null : target.getSubscription(); } /** @@ -526,34 +518,57 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F tag = new AMQShortString("sgen_" + getNextConsumerTag()); } - if (_tag2SubscriptionMap.containsKey(tag)) + if (_tag2SubscriptionTargetMap.containsKey(tag)) { throw new AMQException("Consumer already exists with same tag: " + tag); } - Subscription subscription = - SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); + SubscriptionTarget_0_8 target; + EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) + { + target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + options.add(Subscription.Option.TRANSIENT); + } + else if(acks) + { + target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + options.add(Subscription.Option.ACQUIRES); + options.add(Subscription.Option.SEES_REQUEUES); + } + else + { + target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + options.add(Subscription.Option.ACQUIRES); + options.add(Subscription.Option.SEES_REQUEUES); + } + + if(exclusive) + { + options.add(Subscription.Option.EXCLUSIVE); + } // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. - _tag2SubscriptionMap.put(tag, subscription); + _tag2SubscriptionTargetMap.put(tag, target); try { - queue.registerSubscription(subscription, exclusive); + Subscription sub = + queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options); } catch (AMQException e) { - _tag2SubscriptionMap.remove(tag); + _tag2SubscriptionTargetMap.remove(tag); throw e; } catch (RuntimeException e) { - _tag2SubscriptionMap.remove(tag); + _tag2SubscriptionTargetMap.remove(tag); throw e; } return tag; @@ -568,7 +583,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException { - Subscription sub = _tag2SubscriptionMap.remove(consumerTag); + SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); + Subscription sub = target == null ? null : target.getSubscription(); if (sub != null) { try @@ -634,7 +650,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { if (_logger.isInfoEnabled()) { - if (!_tag2SubscriptionMap.isEmpty()) + if (!_tag2SubscriptionTargetMap.isEmpty()) { _logger.info("Unsubscribing all consumers on channel " + toString()); } @@ -644,14 +660,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) + for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet()) { if (_logger.isInfoEnabled()) { _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Subscription sub = me.getValue(); + Subscription sub = me.getValue().getSubscription(); try { @@ -665,7 +681,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - _tag2SubscriptionMap.clear(); + _tag2SubscriptionTargetMap.clear(); } /** @@ -977,9 +993,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (wasSuspended) { // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) + for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { - s.getQueue().deliverAsync(s); + s.getSubscription().getQueue().deliverAsync(s.getSubscription()); } } @@ -993,15 +1009,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (!wasSuspended) { // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) + for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { try { - s.getSendLock(); + s.getSubscription().getSendLock(); } finally { - s.releaseSendLock(); + s.getSubscription().releaseSendLock(); } } } @@ -1078,10 +1094,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F boolean requiresSuspend = _suspended.compareAndSet(false,true); // ensure all subscriptions have seen the change to the channel state - for(Subscription sub : _tag2SubscriptionMap.values()) + for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getSendLock(); - sub.releaseSendLock(); + sub.getSubscription().getSendLock(); + sub.getSubscription().releaseSendLock(); } try @@ -1116,9 +1132,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(requiresSuspend) { _suspended.set(false); - for(Subscription sub : _tag2SubscriptionMap.values()) + for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getQueue().deliverAsync(sub); + sub.getSubscription().getQueue().deliverAsync(sub.getSubscription()); } } @@ -1672,6 +1688,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F @Override public int getConsumerCount() { - return _tag2SubscriptionMap.size(); + return _tag2SubscriptionTargetMap.size(); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java deleted file mode 100644 index 6646dc0cc2..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; - -/** - * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory - * primarily assists testing although in future more sophisticated subscribers may need a different subscription - * implementation. - * - * @see org.apache.qpid.server.queue.AMQQueue - */ -public interface SubscriptionFactory -{ - Subscription createSubscription(int channel, - AMQProtocolSession protocolSession, - AMQShortString consumerTag, - boolean acks, - FieldTable filters, - boolean noLocal, FlowCreditManager creditManager) throws AMQException; - - - Subscription createSubscription(AMQChannel channel, - AMQProtocolSession protocolSession, - AMQShortString consumerTag, - boolean acks, - FieldTable filters, - boolean noLocal, - FlowCreditManager creditManager, - ClientDeliveryMethod clientMethod, - RecordDeliveryMethod recordMethod) throws AMQException; - - - Subscription createBasicGetNoAckSubscription(AMQChannel channel, - AMQProtocolSession session, - AMQShortString consumerTag, - FieldTable filters, - boolean noLocal, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) throws AMQException; - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java deleted file mode 100644 index 05f35748ee..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.DelegatingSubscription; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; - -public class SubscriptionFactoryImpl implements SubscriptionFactory -{ - - public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession, - AMQShortString consumerTag, boolean acks, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager) throws AMQException - { - AMQChannel channel = protocolSession.getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); - } - ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod(); - RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod(); - - - return createSubscription(channel, protocolSession, consumerTag, acks, filters, - noLocal, - creditManager, - clientMethod, - recordMethod - ); - } - - - public Subscription createSubscription(final AMQChannel channel, - final AMQProtocolSession protocolSession, - final AMQShortString consumerTag, - final boolean acks, - final FieldTable filters, - final boolean noLocal, - final FlowCreditManager creditManager, - final ClientDeliveryMethod clientMethod, - final RecordDeliveryMethod recordMethod - ) - throws AMQException - { - boolean isBrowser; - SubscriptionTarget_0_8 target; - Subscription subscription; - - if (filters != null) - { - Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue()); - isBrowser = (isBrowserObj != null) && isBrowserObj.booleanValue(); - } - else - { - isBrowser = false; - } - - final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); - boolean acquires; - boolean seesReuques; - boolean isTransient; - if(isBrowser) - { - target = new SubscriptionTarget_0_8.BrowserSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod); - acquires = false; - seesReuques = false; - isTransient = true; - } - else if(acks) - { - target = new SubscriptionTarget_0_8.AckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod); - acquires = true; - seesReuques = true; - isTransient = false; - } - else - { - target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, clientMethod, recordMethod); - acquires = true; - seesReuques = true; - isTransient = false; - } - subscription = - new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, acquires, seesReuques, AMQShortString.toString(consumerTag),isTransient,target); - target.setSubscription(subscription); - return subscription; - } - - - - public Subscription createBasicGetNoAckSubscription(final AMQChannel channel, - final AMQProtocolSession session, - final AMQShortString consumerTag, - final FieldTable filters, - final boolean noLocal, - final FlowCreditManager creditManager, - final ClientDeliveryMethod deliveryMethod, - final RecordDeliveryMethod recordMethod) throws AMQException - { - SubscriptionTarget_0_8 target = new SubscriptionTarget_0_8.NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); - - Subscription subscription; - final FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); - subscription = - new DelegatingSubscription<SubscriptionTarget_0_8>(filterManager, AMQMessage.class, true, true, AMQShortString.toString(consumerTag),true,target); - target.setSubscription(subscription); - return subscription; - } - - public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl(); - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java index 6c91e6130e..f7acad87a1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java @@ -76,6 +76,13 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget private Subscription _subscription; + public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException + { + return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + } + static final class BrowserSubscription extends SubscriptionTarget_0_8 { public BrowserSubscription(AMQChannel channel, @@ -120,6 +127,22 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } + public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException + { + return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + } + + public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException + { + return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + public static class NoAckSubscription extends SubscriptionTarget_0_8 { private final AutoCommitTransaction _txn; @@ -220,6 +243,26 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } + + public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) + throws AMQException + { + return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + } + + + public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod); + } + static final class AckSubscription extends SubscriptionTarget_0_8 { public AckSubscription(AMQChannel channel, @@ -317,14 +360,20 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } } - public void setSubscription(Subscription subscription) + public Subscription getSubscription() { - _subscription = subscription; + return _subscription; } - public Subscription getSubscription() + @Override + public void subscriptionRemoved(final Subscription sub) { - return _subscription; + } + + @Override + public void subscriptionRegistered(final Subscription sub) + { + _subscription = sub; } public AMQSessionModel getSessionModel() diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index a48ae3826e..d5f7aad93d 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; @@ -33,8 +34,10 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -42,9 +45,10 @@ import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.EnumSet; + public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); @@ -151,17 +155,24 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } }; - Subscription sub; + SubscriptionTarget_0_8 target; + EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES, + Subscription.Option.SEES_REQUEUES); if(acks) { - sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); + + target = SubscriptionTarget_0_8.createAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } else { - sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); + target = SubscriptionTarget_0_8.createNoAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } - queue.registerSubscription(sub,false); + Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options); sub.flush(); queue.unregisterSubscription(sub); return(!singleMessageCredit.hasCredit()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 2243cbff11..edb60a0c6c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Set; /** @@ -47,6 +48,7 @@ import java.util.Set; */ public class AckTest extends QpidTestCase { + private SubscriptionTarget_0_8 _subscriptionTarget; private Subscription _subscription; private AMQProtocolSession _protocolSession; @@ -86,7 +88,6 @@ public class AckTest extends QpidTestCase private void publishMessages(int count, boolean persistent) throws AMQException { - _queue.registerSubscription(_subscription,false); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -178,7 +179,10 @@ public class AckTest extends QpidTestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); + _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager()); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -202,7 +206,16 @@ public class AckTest extends QpidTestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); + _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _subscription = _queue.registerSubscription(_subscriptionTarget, + null, + AMQMessage.class, + DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -218,7 +231,13 @@ public class AckTest extends QpidTestCase public void testPersistentNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); @@ -235,7 +254,15 @@ public class AckTest extends QpidTestCase */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); + final int msgCount = 10; publishMessages(msgCount); @@ -264,7 +291,15 @@ public class AckTest extends QpidTestCase */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); + final int msgCount = 10; publishMessages(msgCount); @@ -290,7 +325,15 @@ public class AckTest extends QpidTestCase */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); + final int msgCount = 10; publishMessages(msgCount); @@ -319,8 +362,12 @@ public class AckTest extends QpidTestCase // Send 10 messages Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, - DEFAULT_CONSUMER_TAG, true, null, false, creditManager); + + _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); + _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Subscription.Option.SEES_REQUEUES, + Subscription.Option.ACQUIRES)); + final int msgCount = 1; publishMessages(msgCount); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index 36a57fa05f..88702e66f4 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -37,6 +37,9 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * QPID-1385 : Race condition between added to unacked map and resending due to a rollback. * @@ -105,9 +108,11 @@ public class ExtractResendAndRequeueTest extends TestCase */ private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList) { - Subscription subscription = new MockSubscription(); + Subscription subscription = mock(Subscription.class); + when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription)); + when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); - // Aquire messages in subscription + // Acquire messages in subscription for (QueueEntry entry : messageList) { entry.acquire(subscription); @@ -157,7 +162,7 @@ public class ExtractResendAndRequeueTest extends TestCase Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList); // Close subscription - subscription.close(); + when(subscription.isClosed()).thenReturn(true); final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java deleted file mode 100644 index e0d1b28007..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -public class SubscriptionFactoryImplTest extends QpidTestCase -{ - private AMQChannel _channel; - private AMQProtocolSession _session; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper_0_8.createChannel(); - _session = _channel.getProtocolSession(); - GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - /** - * Tests that while creating Subscriptions of various types, the - * ID numbers assigned are allocated from a common sequence - * (in increasing order). - */ - public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception - { - //create a No-Ack subscription, get the first Subscription ID - long previousId = 0; - Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); - previousId = noAckSub.getSubscriptionID(); - - //create an ack subscription, verify the next Subscription ID is used - Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); - previousId = ackSub.getSubscriptionID(); - - //create a browser subscription - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - Subscription browserSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, browserSub.getSubscriptionID()); - previousId = browserSub.getSubscriptionID(); - - //create an BasicGet NoAck subscription - Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, - _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); - previousId = getNoAckSub.getSubscriptionID(); - - } - -} diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index c590a18823..264c662364 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -110,6 +111,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS QueueDestination qd = null; AMQQueue queue = null; + EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); boolean noLocal = false; @@ -173,14 +175,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS source.setFilter(actualFilters.isEmpty() ? null : actualFilters); _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); - _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter), - Message_1_0.class, - source.getDistributionMode() != StdDistMode.COPY, - source.getDistributionMode() != StdDistMode.COPY, - getEndpoint().getName(), - false, - _target); - _target.setSubscription(_subscription); + if(source.getDistributionMode() != StdDistMode.COPY) + { + options.add(Subscription.Option.ACQUIRES); + options.add(Subscription.Option.SEES_REQUEUES); + } } else if(destination instanceof ExchangeDestination) @@ -373,26 +372,27 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new SubscriptionTarget_1_0(this, true); - _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter), - Message_1_0.class, - true, - true, - getEndpoint().getName(), - false, - _target); - _target.setSubscription(_subscription); + options.add(Subscription.Option.ACQUIRES); + options.add(Subscription.Option.SEES_REQUEUES); } - if(_subscription != null) + if(_target != null) { + if(noLocal) + { + options.add(Subscription.Option.NO_LOCAL); + } + + _subscription.setNoLocal(noLocal); try { - - queue.registerSubscription(_subscription, false); + _subscription = queue.registerSubscription(_target, + messageFilter == null ? null : new SimpleFilterManager(messageFilter), + Message_1_0.class, getEndpoint().getName(), options); } catch (AMQException e) { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java index ebff2d2ee7..55c8407ea5 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java @@ -74,11 +74,6 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget _acquires = acquires; } - public void setSubscription(Subscription sub) - { - _subscription = sub; - } - public Subscription getSubscription() { return _subscription; @@ -505,6 +500,18 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget } @Override + public void subscriptionRegistered(final Subscription sub) + { + _subscription = sub; + } + + @Override + public void subscriptionRemoved(final Subscription sub) + { + + } + + @Override public long getUnacknowledgedBytes() { // TODO |