summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-08-24 20:37:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-08-24 20:37:19 +0000
commite2fc7fa318d76d1fbcf14c60e43e2ba7cb431bb3 (patch)
tree8ea147f7cc7c5b997686a14810e6a9c72afd3bb1
parent3cf863f0367964e05b75ce790488a237381fcddf (diff)
downloadqpid-python-e2fc7fa318d76d1fbcf14c60e43e2ba7cb431bb3.tar.gz
Implement 0-10 flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@807369 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java178
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java204
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java173
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java124
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java60
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java10
20 files changed, 812 insertions, 44 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
index 8a9d547b55..c5f2d1e808 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
@@ -43,7 +43,7 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
return _bytesCredit.get();
}
- public void addCredit(long messageCredit, long bytesCredit)
+ public void restoreCredit(long messageCredit, long bytesCredit)
{
_bytesCredit.addAndGet(bytesCredit);
setSuspended(false);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
new file mode 100644
index 0000000000..dec0bca576
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.flow;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+{
+ private volatile long _bytesCredit;
+ private volatile long _messageCredit;
+
+
+ public CreditCreditManager()
+ {
+ this(0L, 0L);
+ }
+
+ public CreditCreditManager(long bytesCredit, long messageCredit)
+ {
+ _bytesCredit = bytesCredit;
+ _messageCredit = messageCredit;
+ }
+
+
+ public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
+ {
+ _bytesCredit = bytesCredit;
+ _messageCredit = messageCredit;
+
+ setSuspended(!hasCredit());
+
+ }
+
+
+ public long getMessageCredit()
+ {
+ return _messageCredit == -1L
+ ? Long.MAX_VALUE
+ : _messageCredit;
+ }
+
+ public long getBytesCredit()
+ {
+ return _bytesCredit == -1L
+ ? Long.MAX_VALUE
+ : _bytesCredit;
+ }
+
+ public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
+ {
+
+ }
+
+
+ public synchronized void addCredit(final long messageCredit, final long bytesCredit)
+ {
+ boolean notifyIncrease = true;
+ if(_messageCredit >= 0L && messageCredit > 0L)
+ {
+ notifyIncrease = _messageCredit != 0L;
+ _messageCredit += messageCredit;
+ }
+
+
+
+ if(_bytesCredit >= 0L && bytesCredit > 0L)
+ {
+ notifyIncrease = notifyIncrease && bytesCredit>0;
+ _bytesCredit += bytesCredit;
+
+
+
+ if(notifyIncrease)
+ {
+ notifyIncreaseBytesCredit();
+ }
+ }
+
+
+
+ setSuspended(!hasCredit());
+
+ }
+
+
+
+ public synchronized boolean hasCredit()
+ {
+ // Note !=, if credit is < 0 that indicates infinite credit
+ return (_bytesCredit != 0L && _messageCredit != 0L);
+ }
+
+ public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ {
+ if(_messageCredit >= 0L)
+ {
+ if(_messageCredit > 0)
+ {
+ if(_bytesCredit < 0L)
+ {
+ _messageCredit--;
+
+ return true;
+ }
+ else if(msg.getSize() <= _bytesCredit)
+ {
+ _messageCredit--;
+ _bytesCredit -= msg.getSize();
+
+ return true;
+ }
+ else
+ {
+ //setSuspended(true);
+ return false;
+ }
+ }
+ else
+ {
+ setSuspended(true);
+ return false;
+ }
+ }
+ else if(_bytesCredit >= 0L)
+ {
+ if(msg.getSize() <= _bytesCredit)
+ {
+ _bytesCredit -= msg.getSize();
+
+ return true;
+ }
+ else
+ {
+ //setSuspended(true);
+ return false;
+ }
+
+ }
+ else
+ {
+ return true;
+ }
+
+ }
+
+ public synchronized void stop()
+ {
+ if(_bytesCredit > 0)
+ {
+ _bytesCredit = 0;
+ }
+ if(_messageCredit > 0)
+ {
+ _messageCredit = 0;
+ }
+
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index 96e86da54a..59c3395929 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -38,11 +38,10 @@ public interface FlowCreditManager
boolean removeListener(FlowCreditManagerListener listener);
- public void addCredit(long messageCredit, long bytesCredit);
-
- public void removeAllCredit();
+ public void restoreCredit(long messageCredit, long bytesCredit);
public boolean hasCredit();
public boolean useCreditForMessage(ServerMessage msg);
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
index 3802dcf0f2..901b71fd1f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
@@ -34,7 +34,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements
return -1L;
}
- public void addCredit(long messageCredit, long bytesCredit)
+ public void restoreCredit(long messageCredit, long bytesCredit)
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
index dcbb37c153..19a9ac1d23 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
@@ -43,7 +43,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
return _bytesCredit;
}
- public synchronized void addCredit(long messageCredit, long bytesCredit)
+ public synchronized void restoreCredit(long messageCredit, long bytesCredit)
{
_messageCredit += messageCredit;
_bytesCredit += bytesCredit;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
index 3c84af2228..a386f66b11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
@@ -43,7 +43,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
return -1L;
}
- public void addCredit(long messageCredit, long bytesCredit)
+ public void restoreCredit(long messageCredit, long bytesCredit)
{
_messageCredit.addAndGet(messageCredit);
setSuspended(false);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
index 77bbc82a14..026804439c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
@@ -91,7 +91,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
return _bytesCredit;
}
- public synchronized void addCredit(final long messageCredit, final long bytesCredit)
+ public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
final long messageCreditLimit = _messageCreditLimit;
boolean notifyIncrease = true;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
new file mode 100644
index 0000000000..7b91894526
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.flow;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+{
+ private volatile long _bytesCreditLimit;
+ private volatile long _messageCreditLimit;
+
+ private volatile long _bytesUsed;
+ private volatile long _messageUsed;
+
+ public WindowCreditManager()
+ {
+ this(0L, 0L);
+ }
+
+ public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+ {
+ _bytesCreditLimit = bytesCreditLimit;
+ _messageCreditLimit = messageCreditLimit;
+ }
+
+
+ public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
+ {
+ _bytesCreditLimit = bytesCreditLimit;
+ _messageCreditLimit = messageCreditLimit;
+
+ setSuspended(!hasCredit());
+
+ }
+
+
+ public long getMessageCredit()
+ {
+ return _messageCreditLimit == -1L
+ ? Long.MAX_VALUE
+ : _messageUsed < _messageCreditLimit ? _messageCreditLimit - _messageUsed : 0L;
+ }
+
+ public long getBytesCredit()
+ {
+ return _bytesCreditLimit == -1L
+ ? Long.MAX_VALUE
+ : _bytesUsed < _bytesCreditLimit ? _bytesCreditLimit - _bytesUsed : 0L;
+ }
+
+ public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
+ {
+ boolean notifyIncrease = true;
+ if(_messageCreditLimit > 0L)
+ {
+ notifyIncrease = (_messageUsed != _messageCreditLimit);
+ _messageUsed -= messageCredit;
+
+ //TODO log warning
+ if(_messageUsed < 0L)
+ {
+ _messageUsed = 0;
+ }
+ }
+
+
+
+ if(_bytesCreditLimit > 0L)
+ {
+ notifyIncrease = notifyIncrease && bytesCredit>0;
+ _bytesUsed -= bytesCredit;
+
+ //TODO log warning
+ if(_bytesUsed < 0L)
+ {
+ _bytesUsed = 0;
+ }
+
+ if(notifyIncrease)
+ {
+ notifyIncreaseBytesCredit();
+ }
+ }
+
+
+
+ setSuspended(!hasCredit());
+
+ }
+
+
+
+ public synchronized boolean hasCredit()
+ {
+ return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
+ && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
+ }
+
+ public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ {
+ if(_messageCreditLimit >= 0L)
+ {
+ if(_messageUsed < _messageCreditLimit)
+ {
+ if(_bytesCreditLimit < 0L)
+ {
+ _messageUsed++;
+
+ return true;
+ }
+ else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ {
+ _messageUsed++;
+ _bytesUsed += msg.getSize();
+
+ return true;
+ }
+ else
+ {
+ //setSuspended(true);
+ return false;
+ }
+ }
+ else
+ {
+ setSuspended(true);
+ return false;
+ }
+ }
+ else if(_bytesCreditLimit >= 0L)
+ {
+ if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ {
+ _bytesUsed += msg.getSize();
+
+ return true;
+ }
+ else
+ {
+ //setSuspended(true);
+ return false;
+ }
+
+ }
+ else
+ {
+ return true;
+ }
+
+ }
+
+ public void stop()
+ {
+ if(_bytesCreditLimit > 0)
+ {
+ _bytesCreditLimit = 0;
+ }
+ if(_messageCreditLimit > 0)
+ {
+ _messageCreditLimit = 0;
+ }
+
+ }
+
+ public synchronized void addCredit(long bytes, long count)
+ {
+ if(bytes > 0)
+ {
+ _bytesCreditLimit += bytes;
+ }
+ else if(bytes == -1)
+ {
+ _bytesCreditLimit = -1;
+ }
+
+
+ if(count > 0)
+ {
+ _messageCreditLimit += count;
+ }
+ else if(count == -1)
+ {
+ _messageCreditLimit = -1;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 485c8bd96a..3d694b7137 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -439,7 +439,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!sub.wouldSuspend(entry))
{
- if (!sub.isBrowser() && !entry.acquire(sub))
+ if (sub.acquires() && !entry.acquire(sub))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -556,7 +556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
Subscription sub = subscriberIter.getNode().getSubscription();
// we don't make browsers send the same stuff twice
- if (!sub.isBrowser())
+ if (sub.seesRequeues())
{
updateLastSeenEntry(sub, entry);
}
@@ -1255,7 +1255,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!sub.wouldSuspend(node))
{
- if (!sub.isBrowser() && !node.acquire(sub))
+ if (sub.acquires() && !node.acquire(sub))
{
sub.restoreCredit(node);
}
@@ -1263,7 +1263,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
deliverMessage(sub, node);
- if (sub.isBrowser())
+ if (!sub.acquires())
{
QueueEntry newNode = _entries.next(node);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index cc6b00609a..a99ca3b118 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -59,7 +59,9 @@ public interface Subscription
boolean isClosed();
- boolean isBrowser();
+ boolean acquires();
+
+ boolean seesRequeues();
void close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 5bb746e55f..382cda08da 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -33,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
@@ -547,7 +546,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public void restoreCredit(final QueueEntry queueEntry)
{
- _creditManager.addCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, queueEntry.getSize());
}
@@ -626,4 +625,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
}
+
+ public boolean acquires()
+ {
+ return !isBrowser();
+ }
+
+ public boolean seesRequeues()
+ {
+ return !isBrowser();
+ }
+
+ abstract boolean isBrowser();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index bdb3bf1b86..2004478ed4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -22,11 +22,17 @@ package org.apache.qpid.server.subscription;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.MessageCleanupException;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.CreditCreditManager;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.*;
@@ -35,10 +41,9 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
-import sun.awt.X11.XSystemTrayPeer;
-
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
{
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
@@ -49,7 +54,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private FlowCreditManager _creditManager;
+ private FlowCreditManager_0_10 _creditManager;
private StateListener _stateListener = new StateListener()
@@ -66,11 +71,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final FilterManager _filters;
private final MessageAcceptMode _acceptMode;
private final MessageAcquireMode _acquireMode;
+ private MessageFlowMode _flowMode;
private final ServerSession _session;
+ private AtomicBoolean _stopped = new AtomicBoolean(true);
+ private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode, FlowCreditManager creditManager, FilterManager filters)
+ MessageAcquireMode acquireMode, FlowCreditManager_0_10 creditManager, FilterManager filters)
{
_session = session;
_destination = destination;
@@ -159,7 +167,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
+ }
+
+ public boolean seesRequeues()
+ {
+ return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT;
}
public void close()
@@ -218,7 +231,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
- public void send(QueueEntry entry) throws AMQException
+ public void send(final QueueEntry entry) throws AMQException
{
ServerMessage serverMsg = entry.getMessage();
@@ -278,14 +291,78 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
deliveryProps.setRedelivered(entry.isRedelivered());
newHeaders.add(deliveryProps);
- xfr.setHeader(new Header(newHeaders));
+ xfr.setHeader(new Header(newHeaders));
+
+ if(_acceptMode == MessageAcceptMode.NONE)
+ {
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry));
+ }
+
_session.sendMessage(xfr);
+ if(_acceptMode == MessageAcceptMode.EXPLICIT)
+ {
+ // potential race condition if incomming commands on this session can be processed on a different thread
+ // to this one (i.e. the message is only put in the map *after* it has been sent, theoretically we could get
+ // acknowledgement back before reaching the next line)
+ _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener()
+ {
+ public void onAccept()
+ {
+ acknowledge(entry);
+ }
+
+ public void onRelease()
+ {
+ release(entry);
+ }
+
+ public void onReject()
+ {
+ reject(entry);
+ }
+ });
+ }
+ else
+ {
+ _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener()
+ {
+ public void onAccept()
+ {
+ // TODO : should log error of explicit accept on non-explicit sub
+ }
+
+ public void onRelease()
+ {
+ release(entry);
+ }
+
+ public void onReject()
+ {
+ reject(entry);
+ }
+
+ });
+ }
+
+
+ }
+
+ private void reject(QueueEntry entry)
+ {
+ entry.setRedelivered(true);
+ entry.reject(this);
}
+ private void release(QueueEntry entry)
+ {
+ entry.setRedelivered(true);
+ entry.release();
+ }
+
public void queueDeleted(AMQQueue queue)
{
_deleted.set(true);
@@ -308,7 +385,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void restoreCredit(QueueEntry queueEntry)
{
- _creditManager.addCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, queueEntry.getSize());
}
public void setStateListener(StateListener listener)
@@ -342,20 +419,92 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
- public FlowCreditManager getCreditManager()
+ public FlowCreditManager_0_10 getCreditManager()
{
return _creditManager;
}
- public void setCreditManager(FlowCreditManager creditManager)
+
+ public void stop()
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ _stopped.set(true);
+ }
+
+ public void addCredit(MessageCreditUnit unit, long value)
+ {
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+
+ switch (unit)
+ {
+ case MESSAGE:
+
+ creditManager.addCredit(value, 0L);
+ break;
+ case BYTE:
+ creditManager.addCredit(0L, value);
+ break;
+ }
+
+ _stopped.set(false);
+
+ if(creditManager.hasCredit())
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ }
+
+ }
+
+ public void setFlowMode(MessageFlowMode flowMode)
{
+
_creditManager.removeListener(this);
- _creditManager = creditManager;
+ switch(flowMode)
+ {
+ case CREDIT:
+ _creditManager = new CreditCreditManager(0l,0l);
+ break;
+ case WINDOW:
+ _creditManager = new WindowCreditManager(0l,0l);
+ break;
+ default:
+ throw new RuntimeException("Unknown message flow mode: " + flowMode);
+ }
+ _creditManager.addStateListener(this);
+ }
- creditManager.addStateListener(this);
+ public boolean isStopped()
+ {
+ return _stopped.get();
+ }
+ public boolean acquires()
+ {
+ return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
}
+ public void acknowledge(QueueEntry entry)
+ {
+ // TODO Fix Store Context / cleanup
+ try
+ {
+ entry.discard(new StoreContext());
+ }
+ catch (FailedDequeueException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (MessageCleanupException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index d8ec0d881e..bbc2396063 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -21,15 +21,31 @@
package org.apache.qpid.server.transport;
import org.apache.qpid.transport.*;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.AMQException;
-import java.util.ArrayList;
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListMap;
+import static org.apache.qpid.util.Serial.*;
public class ServerSession extends Session
{
+
+
+ public static interface MessageDispositionChangeListener
+ {
+ public void onAccept();
+
+ public void onRelease();
+
+ public void onReject();
+ }
+
+
+ private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
+ new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
+
ServerSession(Connection connection, Binary name, long expiry)
{
super(connection, name, expiry);
@@ -58,8 +74,112 @@ public class ServerSession extends Session
}
}
+
public void sendMessage(MessageTransfer xfr)
{
invoke(xfr);
}
+
+ public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
+ {
+ _messageDispositionListenerMap.put(xfr.getId(), acceptListener);
+ }
+
+
+ private static interface MessageDispositionAction
+ {
+ void performAction(MessageDispositionChangeListener listener);
+ }
+
+ public void accept(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onAccept();
+ }
+ });
+ }
+
+
+ public void release(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onRelease();
+ }
+ });
+ }
+
+ public void reject(RangeSet ranges)
+ {
+ dispositionChange(ranges, new MessageDispositionAction()
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onReject();
+ }
+ });
+ }
+
+ public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
+ {
+ if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = ranges.iterator();
+
+ if(rangeIter.hasNext())
+ {
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
+ {
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
+ {
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
+ }
+ if(range != null && range.includes(next))
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+ action.performAction(changeListener);
+ }
+
+
+ }
+
+ }
+
+
+ }
+ }
+
+ public void removeDispositionListener(Method method)
+ {
+ _messageDispositionListenerMap.remove(method.getId());
+ }
+
+ public void releaseAll()
+ {
+ for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
+ {
+ listener.onRelease();
+ }
+ _messageDispositionListenerMap.clear();
+ }
+
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index a178cba78b..2d224d721a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -27,9 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.flow.*;
import org.apache.qpid.AMQException;
@@ -37,12 +35,11 @@ import org.apache.qpid.AMQException;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
-import java.nio.ByteBuffer;
public class ServerSessionDelegate extends SessionDelegate
{
private final IApplicationRegistry _appRegistry;
- private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>();
+ private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>();
public ServerSessionDelegate(IApplicationRegistry appRegistry)
{
@@ -62,7 +59,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageAccept(Session session, MessageAccept method)
{
- super.messageAccept(session, method);
+ ((ServerSession)session).accept(method.getTransfers());
}
@@ -70,13 +67,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageReject(Session session, MessageReject method)
{
- super.messageReject(session, method);
+ ((ServerSession)session).reject(method.getTransfers());
}
@Override
public void messageRelease(Session session, MessageRelease method)
{
- super.messageRelease(session, method);
+ ((ServerSession)session).release(method.getTransfers());
}
@Override
@@ -102,7 +99,8 @@ public class ServerSessionDelegate extends SessionDelegate
//TODO null check
- FlowCreditManager creditManager = new MessageOnlyCreditManager(0L);
+
+ FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
// TODO filters
@@ -439,6 +437,36 @@ public class ServerSessionDelegate extends SessionDelegate
super.queueQuery(session, method);
}
+ @Override
+ public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
+ {
+ String destination = sfm.getDestination();
+
+ Subscription_0_10 sub = _subscriptions.get(destination);
+
+ // TODO null check
+
+ if(sub.isStopped())
+ {
+ sub.setFlowMode(sfm.getFlowMode());
+ }
+
+
+
+ }
+
+ @Override
+ public void messageStop(Session ssn, MessageStop stop)
+ {
+ String destination = stop.getDestination();
+
+ Subscription_0_10 sub = _subscriptions.get(destination);
+
+ // TODO null check
+
+ sub.stop();
+
+ }
@Override
public void messageFlow(Session ssn, MessageFlow flow)
@@ -447,12 +475,20 @@ public class ServerSessionDelegate extends SessionDelegate
Subscription_0_10 sub = _subscriptions.get(destination);
- FlowCreditManager creditManager = sub.getCreditManager();
+ // TODO null check
- if(flow.getUnit() == MessageCreditUnit.MESSAGE)
+ sub.addCredit(flow.getUnit(), flow.getValue());
+
+ }
+
+ @Override
+ public void closed(Session session)
+ {
+ super.closed(session);
+ for(Subscription_0_10 sub : _subscriptions.values())
{
- creditManager.addCredit(flow.getValue(), 0L);
+ sub.close();
+ ((ServerSession)session).releaseAll();
}
-
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index c8ca126136..51d9bd8be2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -119,6 +119,16 @@ public class MockSubscription implements Subscription
return _closed;
}
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ public boolean seesRequeues()
+ {
+ return true;
+ }
+
public boolean isSuspended()
{
return false;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index e1e1f846cb..4b7f711bff 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -35,6 +35,7 @@ import static org.apache.qpid.transport.util.Functions.*;
public abstract class Method extends Struct implements ProtocolEvent
{
+
public static final Method create(int type)
{
// XXX: should generate separate factories for separate
@@ -43,12 +44,18 @@ public abstract class Method extends Struct implements ProtocolEvent
}
// XXX: command subclass?
+ public static interface CompletionListener
+ {
+ public void onComplete(Method method);
+ }
+
private int id;
private int channel;
private boolean idSet = false;
private boolean sync = false;
private boolean batch = false;
private boolean unreliable = false;
+ private CompletionListener completionListener;
public final int getId()
{
@@ -61,6 +68,11 @@ public abstract class Method extends Struct implements ProtocolEvent
this.idSet = true;
}
+ boolean idSet()
+ {
+ return idSet;
+ }
+
public final int getChannel()
{
return channel;
@@ -152,6 +164,21 @@ public abstract class Method extends Struct implements ProtocolEvent
}
}
+
+ public void setCompletionListener(CompletionListener completionListener)
+ {
+ this.completionListener = completionListener;
+ }
+
+ public void complete()
+ {
+ if(completionListener!= null)
+ {
+ completionListener.onComplete(this);
+ completionListener = null;
+ }
+ }
+
public String toString()
{
StringBuilder str = new StringBuilder();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
index 9b2744ee8b..3850dc162b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
@@ -52,6 +52,11 @@ public final class RangeSet implements Iterable<Range>
return ranges.getFirst();
}
+ public Range getLast()
+ {
+ return ranges.getLast();
+ }
+
public boolean includes(Range range)
{
for (Range r : this)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index a8757bcb3c..6a69c62300 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -448,7 +448,7 @@ public class Session extends SessionInvoker
}
}
- boolean complete(int lower, int upper)
+ protected boolean complete(int lower, int upper)
{
//avoid autoboxing
if(log.isDebugEnabled())
@@ -465,8 +465,9 @@ public class Session extends SessionInvoker
if (m != null)
{
commandBytes -= m.getBodySize();
+ m.complete();
+ commands[idx] = null;
}
- commands[idx] = null;
}
if (le(lower, maxComplete + 1))
{
@@ -561,7 +562,8 @@ public class Session extends SessionInvoker
"(state=%s)", state));
}
- int next = commandsOut++;
+ int next;
+ next = commandsOut++;
m.setId(next);
if (isFull(next))
@@ -918,6 +920,14 @@ public class Session extends SessionInvoker
}
}
}
+ if(state == CLOSED)
+ {
+ delegate.closed(this);
+ }
+ else
+ {
+ delegate.detached(this);
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index c8d0855607..6146f029b2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -184,4 +184,11 @@ public class SessionDelegate
}
}
+ public void closed(Session session)
+ {
+ }
+
+ public void detached(Session session)
+ {
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 73cb8cd2cd..9271e1ce16 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -210,6 +210,16 @@ public class SubscriptionTestHelper implements Subscription
return false;
}
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ public boolean seesRequeues()
+ {
+ return true;
+ }
+
public boolean isBrowser()
{
return false;