summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-04 22:07:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-04 22:07:44 +0000
commit3878f34525cde97ad35f27caf81adf41ab325730 (patch)
tree9f409a6ce61228dda8bb3583d97895fb81a61a8c
parentf9c2d2e6cd6ea383c2c1c5d7c9bc96f929396e51 (diff)
downloadqpid-python-3878f34525cde97ad35f27caf81adf41ab325730.tar.gz
move state to MessageInstance from QueueEntry, change isRejectedBy to work on Subscription rather than id
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564522 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java121
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java143
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java15
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java7
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java2
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java10
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java1
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java1
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java)3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java)3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java22
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java73
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java2
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java4
20 files changed, 202 insertions, 230 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 491e3a860f..39303a0715 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -22,10 +22,131 @@ package org.apache.qpid.server.message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.subscription.Subscription;
public interface MessageInstance
{
+
+
+ public static enum State
+ {
+ AVAILABLE,
+ ACQUIRED,
+ DEQUEUED,
+ DELETED
+ }
+
+ public abstract class EntryState
+ {
+ private EntryState()
+ {
+ }
+
+ public abstract State getState();
+
+ /**
+ * Returns true if state is either DEQUEUED or DELETED.
+ *
+ * @return true if state is either DEQUEUED or DELETED.
+ */
+ public boolean isDispensed()
+ {
+ State currentState = getState();
+ return currentState == State.DEQUEUED || currentState == State.DELETED;
+ }
+ }
+
+
+ public final class AvailableState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+
+ public final class DequeuedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DEQUEUED;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+
+ public final class DeletedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DELETED;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+ public final class NonSubscriptionAcquiredState extends EntryState
+ {
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public String toString()
+ {
+ return getState().name();
+ }
+ }
+
+ public final class SubscriptionAcquiredState extends EntryState
+ {
+ private final Subscription _subscription;
+
+ public SubscriptionAcquiredState(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
+ }
+
+
+ final static EntryState AVAILABLE_STATE = new AvailableState();
+ final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState DEQUEUED_STATE = new DequeuedState();
+ final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+
boolean isAvailable();
boolean acquire();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 212f2f65fa..5dd1198ba8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -31,147 +31,6 @@ import org.apache.qpid.server.util.StateChangeListener;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
-
-
- public static enum State
- {
- AVAILABLE,
- ACQUIRED,
- EXPIRED,
- DEQUEUED,
- DELETED;
-
-
- }
-
- public abstract class EntryState
- {
- private EntryState()
- {
- }
-
- public abstract State getState();
-
- /**
- * Returns true if state is either DEQUEUED or DELETED.
- *
- * @return true if state is either DEQUEUED or DELETED.
- */
- public boolean isDispensed()
- {
- State currentState = getState();
- return currentState == State.DEQUEUED || currentState == State.DELETED;
- }
- }
-
-
- public final class AvailableState extends EntryState
- {
-
- public State getState()
- {
- return State.AVAILABLE;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DequeuedState extends EntryState
- {
-
- public State getState()
- {
- return State.DEQUEUED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DeletedState extends EntryState
- {
-
- public State getState()
- {
- return State.DELETED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class ExpiredState extends EntryState
- {
-
- public State getState()
- {
- return State.EXPIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class NonSubscriptionAcquiredState extends EntryState
- {
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class SubscriptionAcquiredState extends EntryState
- {
- private final Subscription _subscription;
-
- public SubscriptionAcquiredState(Subscription subscription)
- {
- _subscription = subscription;
- }
-
-
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public Subscription getSubscription()
- {
- return _subscription;
- }
-
- public String toString()
- {
- return "{" + getState().name() + " : " + _subscription +"}";
- }
- }
-
-
- final static EntryState AVAILABLE_STATE = new AvailableState();
- final static EntryState DELETED_STATE = new DeletedState();
- final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
-
AMQQueue getQueue();
long getSize();
@@ -193,7 +52,7 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
void reject();
- boolean isRejectedBy(long subscriptionId);
+ boolean isRejectedBy(Subscription subscription);
int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 1b5d63ae45..18a5db597f 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -294,12 +294,12 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(Subscription subscription)
{
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
- return _rejectedBy.contains(subscriptionId);
+ return _rejectedBy.contains(subscription.getSubscriptionID());
}
else // This message hasn't been rejected yet.
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
index 8bab8e7b4b..08ecf58855 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
@@ -227,13 +227,13 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
@Override
public boolean wouldSuspend(final QueueEntry msg)
{
- return !_target.allocateCredit(msg);
+ return !_target.allocateCredit(msg.getMessage());
}
@Override
public void restoreCredit(final QueueEntry queueEntry)
{
- _target.restoreCredit(queueEntry);
+ _target.restoreCredit(queueEntry.getMessage());
}
@Override
@@ -356,7 +356,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
public final boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
+ if (entry.isRejectedBy(this))
{
return false;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
index bdcd0c15fb..f1d77d9d42 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.util.StateChangeListener;
@@ -56,9 +57,9 @@ public interface SubscriptionTarget
void queueEmpty() throws AMQException;
- boolean allocateCredit(QueueEntry msg);
+ boolean allocateCredit(ServerMessage msg);
- void restoreCredit(QueueEntry queueEntry);
+ void restoreCredit(ServerMessage queueEntry);
boolean isSuspended();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 8ba83d73b1..dbc04bc0cd 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -118,7 +118,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(Subscription subscription)
{
return false;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 2753a89bf4..c2099ec186 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -23,8 +23,7 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.QueueEntry.EntryState;
-import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.subscription.Subscription;
import java.lang.reflect.Field;
@@ -154,9 +153,8 @@ public abstract class QueueEntryImplTestBase extends TestCase
public void testRejectAndRejectedBy()
{
Subscription sub = newMockSubscription();
- long subId = sub.getSubscriptionID();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
//acquire, reject, and release the message using the subscription
@@ -165,19 +163,18 @@ public abstract class QueueEntryImplTestBase extends TestCase
_queueEntry.release();
//verify the rejection is recorded
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
//repeat rejection using a second subscription
Subscription sub2 = newMockSubscription();
- long sub2Id = sub2.getSubscriptionID();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
//verify it still records being rejected by both subscriptions
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
}
/**
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 2e52d47326..cc5235a420 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -27,15 +27,14 @@ import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.StateChangeListener;
@@ -153,7 +152,7 @@ public class MockSubscription implements SubscriptionTarget
{
}
- public void restoreCredit(QueueEntry queueEntry)
+ public void restoreCredit(ServerMessage message)
{
}
@@ -215,7 +214,7 @@ public class MockSubscription implements SubscriptionTarget
}
@Override
- public boolean allocateCredit(final QueueEntry msg)
+ public boolean allocateCredit(final ServerMessage msg)
{
return true;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index 34a7f2e526..9b5272d413 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -44,7 +44,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry);
+ _sub.restoreCredit(_entry.getMessage());
}
if(_entry.isAcquiredBy(_sub.getSubscription()))
{
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
index 6f985a160a..aee7acc99b 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
@@ -344,7 +344,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
{
if (restoreCredit)
{
- restoreCredit(entry);
+ restoreCredit(entry.getMessage());
}
entry.delete();
}
@@ -433,14 +433,14 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
_deleted.set(true);
}
- public boolean allocateCredit(QueueEntry entry)
+ public boolean allocateCredit(ServerMessage message)
{
- return _creditManager.useCreditForMessage(entry.getMessage().getSize());
+ return _creditManager.useCreditForMessage(message.getSize());
}
- public void restoreCredit(QueueEntry queueEntry)
+ public void restoreCredit(ServerMessage message)
{
- _creditManager.restoreCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, message.getSize());
}
public FlowCreditManager_0_10 getCreditManager()
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 3baa2f95f0..516d6e48ff 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -62,16 +62,12 @@ import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 8d45160848..131b88fee4 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -94,7 +94,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 85d995518a..6bcd4b9d49 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index 45a1978af1..7f52aa9a08 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -18,11 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
public interface ClientDeliveryMethod
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
index e2ed4104de..5ba0a2f893 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
@@ -18,9 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
public interface RecordDeliveryMethod
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
index ae126281c0..a9d131a8d2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
@@ -26,8 +26,6 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -35,8 +33,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -62,7 +58,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
{
if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
{
- restoreCredit(entry);
+ restoreCredit(entry.getMessage());
}
entry.removeStateChangeListener(this);
}
@@ -120,7 +116,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
@Override
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
return true;
}
@@ -201,7 +197,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
@Override
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
return true;
}
@@ -236,9 +232,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
- return getCreditManager().useCreditForMessage(msg.getMessage().getSize());
+ return getCreditManager().useCreditForMessage(msg.getSize());
}
}
@@ -441,9 +437,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
- public boolean allocateCredit(QueueEntry msg)
+ public boolean allocateCredit(ServerMessage msg)
{
- return _creditManager.useCreditForMessage(msg.getMessage().getSize());
+ return _creditManager.useCreditForMessage(msg.getSize());
}
public AMQChannel getChannel()
@@ -461,9 +457,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
return _channel.getProtocolSession();
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void restoreCredit(final ServerMessage message)
{
- _creditManager.restoreCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, message.getSize());
}
protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 16bdf32427..e632fe02b4 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -42,8 +42,8 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index 88702e66f4..a6e5f29964 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -23,20 +23,20 @@ package org.apache.qpid.server.protocol.v0_8;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -62,40 +62,51 @@ public class ExtractResendAndRequeueTest extends TestCase
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue(getName());
+ private AMQQueue _queue;
private MessageStore _messageStore = new TestMemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+ private Subscription _subscription;
+ private boolean _queueDeleted;
@Override
public void setUp() throws AMQException
{
+ _queueDeleted = false;
_unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn(getName());
+ when(_queue.isDeleted()).thenReturn(_queueDeleted);
+ _subscription = mock(Subscription.class);
+ when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+
long id = 0;
- SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
// Add initial messages to QueueEntryList
for (int count = 0; count < INITIAL_MSG_COUNT; count++)
{
- AMQMessage msg = new MockAMQMessage(id);
-
- list.add(msg);
-
+ ServerMessage msg = mock(ServerMessage.class);
+ when(msg.getMessageNumber()).thenReturn(id);
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(msg);
+ when(entry.getQueue()).thenReturn(_queue);
+ when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(entry.isDeleted()).thenReturn(true);
+ return null;
+ }
+ }).when(entry).delete();
+
+ _unacknowledgedMessageMap.add(id, entry);
+ _referenceList.add(entry);
//Increment ID;
id++;
}
- // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
- QueueEntryIterator queueEntries = list.iterator();
- while(queueEntries.advance())
- {
- QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
- // Store the entry for future inspection
- _referenceList.add(entry);
- }
-
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
@@ -106,19 +117,14 @@ public class ExtractResendAndRequeueTest extends TestCase
*
* @return Subscription that performed the acquire
*/
- private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+ private void acquireMessages(LinkedList<QueueEntry> messageList)
{
- Subscription subscription = mock(Subscription.class);
- when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
- when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
// Acquire messages in subscription
- for (QueueEntry entry : messageList)
+ for(QueueEntry entry : messageList)
{
- entry.acquire(subscription);
+ when(entry.getDeliveredSubscription()).thenReturn(_subscription);
}
-
- return subscription;
}
/**
@@ -133,7 +139,7 @@ public class ExtractResendAndRequeueTest extends TestCase
public void testResend() throws AMQException
{
//We don't need the subscription object here.
- createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
@@ -159,10 +165,10 @@ public class ExtractResendAndRequeueTest extends TestCase
*/
public void testRequeueDueToSubscriptionClosure() throws AMQException
{
- Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
// Close subscription
- when(subscription.isClosed()).thenReturn(true);
+ when(_subscription.isClosed()).thenReturn(true);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
@@ -239,8 +245,7 @@ public class ExtractResendAndRequeueTest extends TestCase
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
- _queue.delete();
-
+ _queueDeleted = true;
// requeueIfUnableToResend : value doesn't matter here as queue has been deleted
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, false, _messageStore));
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 6d3e758144..94dd0acf99 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -47,10 +47,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
index b4f59315df..14d9934c1d 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
@@ -284,7 +284,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
getEndpoint().detach();
}
- public boolean allocateCredit(final QueueEntry msg)
+ public boolean allocateCredit(final ServerMessage msg)
{
synchronized (_link.getLock())
{
@@ -308,7 +308,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void restoreCredit(final ServerMessage message)
{
//TODO
}