From 3878f34525cde97ad35f27caf81adf41ab325730 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 4 Feb 2014 22:07:44 +0000 Subject: move state to MessageInstance from QueueEntry, change isRejectedBy to work on Subscription rather than id git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564522 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/message/MessageInstance.java | 121 +++++++++++++++++ .../org/apache/qpid/server/queue/QueueEntry.java | 143 +-------------------- .../apache/qpid/server/queue/QueueEntryImpl.java | 4 +- .../qpid/server/queue/QueueSubscription.java | 6 +- .../server/subscription/ClientDeliveryMethod.java | 31 ----- .../server/subscription/RecordDeliveryMethod.java | 28 ---- .../server/subscription/SubscriptionTarget.java | 5 +- .../apache/qpid/server/queue/MockQueueEntry.java | 2 +- .../qpid/server/queue/QueueEntryImplTestBase.java | 15 +-- .../qpid/server/subscription/MockSubscription.java | 7 +- .../v0_10/MessageAcceptCompletionListener.java | 2 +- .../protocol/v0_10/SubscriptionTarget_0_10.java | 10 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 4 - .../server/protocol/v0_8/AMQProtocolEngine.java | 1 - .../server/protocol/v0_8/AMQProtocolSession.java | 1 - .../server/protocol/v0_8/ClientDeliveryMethod.java | 32 +++++ .../server/protocol/v0_8/RecordDeliveryMethod.java | 29 +++++ .../protocol/v0_8/SubscriptionTarget_0_8.java | 22 ++-- .../v0_8/handler/BasicGetMethodHandler.java | 4 +- .../protocol/v0_8/ExtractResendAndRequeueTest.java | 73 ++++++----- .../protocol/v0_8/InternalTestProtocolSession.java | 2 - .../protocol/v1_0/SubscriptionTarget_1_0.java | 4 +- 22 files changed, 259 insertions(+), 287 deletions(-) delete mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java delete mode 100644 java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java create mode 100644 java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java create mode 100644 java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 491e3a860f..39303a0715 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -22,10 +22,131 @@ package org.apache.qpid.server.message; import org.apache.qpid.AMQException; +import org.apache.qpid.server.subscription.Subscription; public interface MessageInstance { + + + public static enum State + { + AVAILABLE, + ACQUIRED, + DEQUEUED, + DELETED + } + + public abstract class EntryState + { + private EntryState() + { + } + + public abstract State getState(); + + /** + * Returns true if state is either DEQUEUED or DELETED. + * + * @return true if state is either DEQUEUED or DELETED. + */ + public boolean isDispensed() + { + State currentState = getState(); + return currentState == State.DEQUEUED || currentState == State.DELETED; + } + } + + + public final class AvailableState extends EntryState + { + + public State getState() + { + return State.AVAILABLE; + } + + public String toString() + { + return getState().name(); + } + } + + + public final class DequeuedState extends EntryState + { + + public State getState() + { + return State.DEQUEUED; + } + + public String toString() + { + return getState().name(); + } + } + + + public final class DeletedState extends EntryState + { + + public State getState() + { + return State.DELETED; + } + + public String toString() + { + return getState().name(); + } + } + + public final class NonSubscriptionAcquiredState extends EntryState + { + public State getState() + { + return State.ACQUIRED; + } + + public String toString() + { + return getState().name(); + } + } + + public final class SubscriptionAcquiredState extends EntryState + { + private final Subscription _subscription; + + public SubscriptionAcquiredState(Subscription subscription) + { + _subscription = subscription; + } + + + public State getState() + { + return State.ACQUIRED; + } + + public Subscription getSubscription() + { + return _subscription; + } + + public String toString() + { + return "{" + getState().name() + " : " + _subscription +"}"; + } + } + + + final static EntryState AVAILABLE_STATE = new AvailableState(); + final static EntryState DELETED_STATE = new DeletedState(); + final static EntryState DEQUEUED_STATE = new DequeuedState(); + final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); + boolean isAvailable(); boolean acquire(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 212f2f65fa..5dd1198ba8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -31,147 +31,6 @@ import org.apache.qpid.server.util.StateChangeListener; public interface QueueEntry extends MessageInstance, Comparable { - - - public static enum State - { - AVAILABLE, - ACQUIRED, - EXPIRED, - DEQUEUED, - DELETED; - - - } - - public abstract class EntryState - { - private EntryState() - { - } - - public abstract State getState(); - - /** - * Returns true if state is either DEQUEUED or DELETED. - * - * @return true if state is either DEQUEUED or DELETED. - */ - public boolean isDispensed() - { - State currentState = getState(); - return currentState == State.DEQUEUED || currentState == State.DELETED; - } - } - - - public final class AvailableState extends EntryState - { - - public State getState() - { - return State.AVAILABLE; - } - - public String toString() - { - return getState().name(); - } - } - - - public final class DequeuedState extends EntryState - { - - public State getState() - { - return State.DEQUEUED; - } - - public String toString() - { - return getState().name(); - } - } - - - public final class DeletedState extends EntryState - { - - public State getState() - { - return State.DELETED; - } - - public String toString() - { - return getState().name(); - } - } - - public final class ExpiredState extends EntryState - { - - public State getState() - { - return State.EXPIRED; - } - - public String toString() - { - return getState().name(); - } - } - - - public final class NonSubscriptionAcquiredState extends EntryState - { - public State getState() - { - return State.ACQUIRED; - } - - public String toString() - { - return getState().name(); - } - } - - public final class SubscriptionAcquiredState extends EntryState - { - private final Subscription _subscription; - - public SubscriptionAcquiredState(Subscription subscription) - { - _subscription = subscription; - } - - - public State getState() - { - return State.ACQUIRED; - } - - public Subscription getSubscription() - { - return _subscription; - } - - public String toString() - { - return "{" + getState().name() + " : " + _subscription +"}"; - } - } - - - final static EntryState AVAILABLE_STATE = new AvailableState(); - final static EntryState DELETED_STATE = new DeletedState(); - final static EntryState DEQUEUED_STATE = new DequeuedState(); - final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); - - - - AMQQueue getQueue(); long getSize(); @@ -193,7 +52,7 @@ public interface QueueEntry extends MessageInstance, Comparable void reject(); - boolean isRejectedBy(long subscriptionId); + boolean isRejectedBy(Subscription subscription); int routeToAlternate(final Action action, ServerTransaction txn); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 1b5d63ae45..18a5db597f 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -294,12 +294,12 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(long subscriptionId) + public boolean isRejectedBy(Subscription subscription) { if (_rejectedBy != null) // We have subscriptions that rejected this message { - return _rejectedBy.contains(subscriptionId); + return _rejectedBy.contains(subscription.getSubscriptionID()); } else // This message hasn't been rejected yet. { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java index 8bab8e7b4b..08ecf58855 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java @@ -227,13 +227,13 @@ class QueueSubscription implements Subscription @Override public boolean wouldSuspend(final QueueEntry msg) { - return !_target.allocateCredit(msg); + return !_target.allocateCredit(msg.getMessage()); } @Override public void restoreCredit(final QueueEntry queueEntry) { - _target.restoreCredit(queueEntry); + _target.restoreCredit(queueEntry.getMessage()); } @Override @@ -356,7 +356,7 @@ class QueueSubscription implements Subscription public final boolean hasInterest(QueueEntry entry) { //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) + if (entry.isRejectedBy(this)) { return false; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java deleted file mode 100644 index 45a1978af1..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java +++ /dev/null @@ -1,31 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; - -public interface ClientDeliveryMethod -{ - void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props, - final long deliveryTag) throws AMQException; -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java deleted file mode 100644 index e2ed4104de..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java +++ /dev/null @@ -1,28 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.server.queue.QueueEntry; - -public interface RecordDeliveryMethod -{ - void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag); -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java index bdcd0c15fb..f1d77d9d42 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.util.StateChangeListener; @@ -56,9 +57,9 @@ public interface SubscriptionTarget void queueEmpty() throws AMQException; - boolean allocateCredit(QueueEntry msg); + boolean allocateCredit(ServerMessage msg); - void restoreCredit(QueueEntry queueEntry); + void restoreCredit(ServerMessage queueEntry); boolean isSuspended(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 8ba83d73b1..dbc04bc0cd 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -118,7 +118,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean isRejectedBy(long subscriptionId) + public boolean isRejectedBy(Subscription subscription) { return false; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 2753a89bf4..c2099ec186 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -23,8 +23,7 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.QueueEntry.EntryState; -import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.subscription.Subscription; import java.lang.reflect.Field; @@ -154,9 +153,8 @@ public abstract class QueueEntryImplTestBase extends TestCase public void testRejectAndRejectedBy() { Subscription sub = newMockSubscription(); - long subId = sub.getSubscriptionID(); - assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub)); assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired()); //acquire, reject, and release the message using the subscription @@ -165,19 +163,18 @@ public abstract class QueueEntryImplTestBase extends TestCase _queueEntry.release(); //verify the rejection is recorded - assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub)); //repeat rejection using a second subscription Subscription sub2 = newMockSubscription(); - long sub2Id = sub2.getSubscriptionID(); - assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2)); assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2)); _queueEntry.reject(); //verify it still records being rejected by both subscriptions - assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); - assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub)); + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2)); } /** diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 2e52d47326..cc5235a420 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -27,15 +27,14 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.StateChangeListener; @@ -153,7 +152,7 @@ public class MockSubscription implements SubscriptionTarget { } - public void restoreCredit(QueueEntry queueEntry) + public void restoreCredit(ServerMessage message) { } @@ -215,7 +214,7 @@ public class MockSubscription implements SubscriptionTarget } @Override - public boolean allocateCredit(final QueueEntry msg) + public boolean allocateCredit(final ServerMessage msg) { return true; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index 34a7f2e526..9b5272d413 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -44,7 +44,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { if(_restoreCredit) { - _sub.restoreCredit(_entry); + _sub.restoreCredit(_entry.getMessage()); } if(_entry.isAcquiredBy(_sub.getSubscription())) { diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java index 6f985a160a..aee7acc99b 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java @@ -344,7 +344,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen { if (restoreCredit) { - restoreCredit(entry); + restoreCredit(entry.getMessage()); } entry.delete(); } @@ -433,14 +433,14 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen _deleted.set(true); } - public boolean allocateCredit(QueueEntry entry) + public boolean allocateCredit(ServerMessage message) { - return _creditManager.useCreditForMessage(entry.getMessage().getSize()); + return _creditManager.useCreditForMessage(message.getSize()); } - public void restoreCredit(QueueEntry queueEntry) + public void restoreCredit(ServerMessage message) { - _creditManager.restoreCredit(1, queueEntry.getSize()); + _creditManager.restoreCredit(1, message.getSize()); } public FlowCreditManager_0_10 getCreditManager() diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 3baa2f95f0..516d6e48ff 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -62,16 +62,12 @@ import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 8d45160848..131b88fee4 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -94,7 +94,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 85d995518a..6bcd4b9d49 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java new file mode 100644 index 0000000000..7f52aa9a08 --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.subscription.Subscription; + +public interface ClientDeliveryMethod +{ + void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props, + final long deliveryTag) throws AMQException; +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java new file mode 100644 index 0000000000..5ba0a2f893 --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -0,0 +1,29 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.Subscription; + +public interface RecordDeliveryMethod +{ + void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag); +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java index ae126281c0..a9d131a8d2 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java @@ -26,8 +26,6 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -35,8 +33,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -62,7 +58,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget { if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) { - restoreCredit(entry); + restoreCredit(entry.getMessage()); } entry.removeStateChangeListener(this); } @@ -120,7 +116,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } @Override - public boolean allocateCredit(QueueEntry msg) + public boolean allocateCredit(ServerMessage msg) { return true; } @@ -201,7 +197,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } @Override - public boolean allocateCredit(QueueEntry msg) + public boolean allocateCredit(ServerMessage msg) { return true; } @@ -236,9 +232,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public boolean allocateCredit(QueueEntry msg) + public boolean allocateCredit(ServerMessage msg) { - return getCreditManager().useCreditForMessage(msg.getMessage().getSize()); + return getCreditManager().useCreditForMessage(msg.getSize()); } } @@ -441,9 +437,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } - public boolean allocateCredit(QueueEntry msg) + public boolean allocateCredit(ServerMessage msg) { - return _creditManager.useCreditForMessage(msg.getMessage().getSize()); + return _creditManager.useCreditForMessage(msg.getSize()); } public AMQChannel getChannel() @@ -461,9 +457,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget return _channel.getProtocolSession(); } - public void restoreCredit(final QueueEntry queueEntry) + public void restoreCredit(final ServerMessage message) { - _creditManager.restoreCredit(1, queueEntry.getSize()); + _creditManager.restoreCredit(1, message.getSize()); } protected final StateChangeListener getReleasedStateChangeListener() diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 16bdf32427..e632fe02b4 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -42,8 +42,8 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index 88702e66f4..a6e5f29964 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -23,20 +23,20 @@ package org.apache.qpid.server.protocol.v0_8; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryIterator; -import org.apache.qpid.server.queue.SimpleQueueEntryList; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,40 +62,51 @@ public class ExtractResendAndRequeueTest extends TestCase private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; - private AMQQueue _queue = new MockAMQQueue(getName()); + private AMQQueue _queue; private MessageStore _messageStore = new TestMemoryMessageStore(); private LinkedList _referenceList = new LinkedList(); + private Subscription _subscription; + private boolean _queueDeleted; @Override public void setUp() throws AMQException { + _queueDeleted = false; _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100); + _queue = mock(AMQQueue.class); + when(_queue.getName()).thenReturn(getName()); + when(_queue.isDeleted()).thenReturn(_queueDeleted); + _subscription = mock(Subscription.class); + when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); + long id = 0; - SimpleQueueEntryList list = new SimpleQueueEntryList(_queue); // Add initial messages to QueueEntryList for (int count = 0; count < INITIAL_MSG_COUNT; count++) { - AMQMessage msg = new MockAMQMessage(id); - - list.add(msg); - + ServerMessage msg = mock(ServerMessage.class); + when(msg.getMessageNumber()).thenReturn(id); + final QueueEntry entry = mock(QueueEntry.class); + when(entry.getMessage()).thenReturn(msg); + when(entry.getQueue()).thenReturn(_queue); + when(entry.isQueueDeleted()).thenReturn(_queueDeleted); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + when(entry.isDeleted()).thenReturn(true); + return null; + } + }).when(entry).delete(); + + _unacknowledgedMessageMap.add(id, entry); + _referenceList.add(entry); //Increment ID; id++; } - // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList - QueueEntryIterator queueEntries = list.iterator(); - while(queueEntries.advance()) - { - QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); - - // Store the entry for future inspection - _referenceList.add(entry); - } - assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); } @@ -106,19 +117,14 @@ public class ExtractResendAndRequeueTest extends TestCase * * @return Subscription that performed the acquire */ - private Subscription createSubscriptionAndAcquireMessages(LinkedList messageList) + private void acquireMessages(LinkedList messageList) { - Subscription subscription = mock(Subscription.class); - when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription)); - when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); // Acquire messages in subscription - for (QueueEntry entry : messageList) + for(QueueEntry entry : messageList) { - entry.acquire(subscription); + when(entry.getDeliveredSubscription()).thenReturn(_subscription); } - - return subscription; } /** @@ -133,7 +139,7 @@ public class ExtractResendAndRequeueTest extends TestCase public void testResend() throws AMQException { //We don't need the subscription object here. - createSubscriptionAndAcquireMessages(_referenceList); + acquireMessages(_referenceList); final Map msgToRequeue = new LinkedHashMap(); final Map msgToResend = new LinkedHashMap(); @@ -159,10 +165,10 @@ public class ExtractResendAndRequeueTest extends TestCase */ public void testRequeueDueToSubscriptionClosure() throws AMQException { - Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList); + acquireMessages(_referenceList); // Close subscription - when(subscription.isClosed()).thenReturn(true); + when(_subscription.isClosed()).thenReturn(true); final Map msgToRequeue = new LinkedHashMap(); final Map msgToResend = new LinkedHashMap(); @@ -239,8 +245,7 @@ public class ExtractResendAndRequeueTest extends TestCase final Map msgToRequeue = new LinkedHashMap(); final Map msgToResend = new LinkedHashMap(); - _queue.delete(); - + _queueDeleted = true; // requeueIfUnableToResend : value doesn't matter here as queue has been deleted _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, msgToResend, false, _messageStore)); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 6d3e758144..94dd0acf99 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -47,10 +47,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java index b4f59315df..14d9934c1d 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java @@ -284,7 +284,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget getEndpoint().detach(); } - public boolean allocateCredit(final QueueEntry msg) + public boolean allocateCredit(final ServerMessage msg) { synchronized (_link.getLock()) { @@ -308,7 +308,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget } - public void restoreCredit(final QueueEntry queueEntry) + public void restoreCredit(final ServerMessage message) { //TODO } -- cgit v1.2.1