diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-08-24 20:37:19 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-08-24 20:37:19 +0000 |
commit | e2fc7fa318d76d1fbcf14c60e43e2ba7cb431bb3 (patch) | |
tree | 8ea147f7cc7c5b997686a14810e6a9c72afd3bb1 | |
parent | 3cf863f0367964e05b75ce790488a237381fcddf (diff) | |
download | qpid-python-e2fc7fa318d76d1fbcf14c60e43e2ba7cb431bb3.tar.gz |
Implement 0-10 flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@807369 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 812 insertions, 44 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index 8a9d547b55..c5f2d1e808 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -43,7 +43,7 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager return _bytesCredit.get(); } - public void addCredit(long messageCredit, long bytesCredit) + public void restoreCredit(long messageCredit, long bytesCredit) { _bytesCredit.addAndGet(bytesCredit); setSuspended(false); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java new file mode 100644 index 0000000000..dec0bca576 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.flow; + +import org.apache.qpid.server.message.ServerMessage; + +public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 +{ + private volatile long _bytesCredit; + private volatile long _messageCredit; + + + public CreditCreditManager() + { + this(0L, 0L); + } + + public CreditCreditManager(long bytesCredit, long messageCredit) + { + _bytesCredit = bytesCredit; + _messageCredit = messageCredit; + } + + + public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) + { + _bytesCredit = bytesCredit; + _messageCredit = messageCredit; + + setSuspended(!hasCredit()); + + } + + + public long getMessageCredit() + { + return _messageCredit == -1L + ? Long.MAX_VALUE + : _messageCredit; + } + + public long getBytesCredit() + { + return _bytesCredit == -1L + ? Long.MAX_VALUE + : _bytesCredit; + } + + public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) + { + + } + + + public synchronized void addCredit(final long messageCredit, final long bytesCredit) + { + boolean notifyIncrease = true; + if(_messageCredit >= 0L && messageCredit > 0L) + { + notifyIncrease = _messageCredit != 0L; + _messageCredit += messageCredit; + } + + + + if(_bytesCredit >= 0L && bytesCredit > 0L) + { + notifyIncrease = notifyIncrease && bytesCredit>0; + _bytesCredit += bytesCredit; + + + + if(notifyIncrease) + { + notifyIncreaseBytesCredit(); + } + } + + + + setSuspended(!hasCredit()); + + } + + + + public synchronized boolean hasCredit() + { + // Note !=, if credit is < 0 that indicates infinite credit + return (_bytesCredit != 0L && _messageCredit != 0L); + } + + public synchronized boolean useCreditForMessage(final ServerMessage msg) + { + if(_messageCredit >= 0L) + { + if(_messageCredit > 0) + { + if(_bytesCredit < 0L) + { + _messageCredit--; + + return true; + } + else if(msg.getSize() <= _bytesCredit) + { + _messageCredit--; + _bytesCredit -= msg.getSize(); + + return true; + } + else + { + //setSuspended(true); + return false; + } + } + else + { + setSuspended(true); + return false; + } + } + else if(_bytesCredit >= 0L) + { + if(msg.getSize() <= _bytesCredit) + { + _bytesCredit -= msg.getSize(); + + return true; + } + else + { + //setSuspended(true); + return false; + } + + } + else + { + return true; + } + + } + + public synchronized void stop() + { + if(_bytesCredit > 0) + { + _bytesCredit = 0; + } + if(_messageCredit > 0) + { + _messageCredit = 0; + } + + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index 96e86da54a..59c3395929 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -38,11 +38,10 @@ public interface FlowCreditManager boolean removeListener(FlowCreditManagerListener listener); - public void addCredit(long messageCredit, long bytesCredit); - - public void removeAllCredit(); + public void restoreCredit(long messageCredit, long bytesCredit); public boolean hasCredit(); public boolean useCreditForMessage(ServerMessage msg); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index 3802dcf0f2..901b71fd1f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -34,7 +34,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements return -1L; } - public void addCredit(long messageCredit, long bytesCredit) + public void restoreCredit(long messageCredit, long bytesCredit) { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index dcbb37c153..19a9ac1d23 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -43,7 +43,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl return _bytesCredit; } - public synchronized void addCredit(long messageCredit, long bytesCredit) + public synchronized void restoreCredit(long messageCredit, long bytesCredit) { _messageCredit += messageCredit; _bytesCredit += bytesCredit; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index 3c84af2228..a386f66b11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -43,7 +43,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen return -1L; } - public void addCredit(long messageCredit, long bytesCredit) + public void restoreCredit(long messageCredit, long bytesCredit) { _messageCredit.addAndGet(messageCredit); setSuspended(false); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index 77bbc82a14..026804439c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -91,7 +91,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F return _bytesCredit; } - public synchronized void addCredit(final long messageCredit, final long bytesCredit) + public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { final long messageCreditLimit = _messageCreditLimit; boolean notifyIncrease = true; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java new file mode 100644 index 0000000000..7b91894526 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.flow; + +import org.apache.qpid.server.message.ServerMessage; + +public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 +{ + private volatile long _bytesCreditLimit; + private volatile long _messageCreditLimit; + + private volatile long _bytesUsed; + private volatile long _messageUsed; + + public WindowCreditManager() + { + this(0L, 0L); + } + + public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit) + { + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + } + + + public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) + { + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + + setSuspended(!hasCredit()); + + } + + + public long getMessageCredit() + { + return _messageCreditLimit == -1L + ? Long.MAX_VALUE + : _messageUsed < _messageCreditLimit ? _messageCreditLimit - _messageUsed : 0L; + } + + public long getBytesCredit() + { + return _bytesCreditLimit == -1L + ? Long.MAX_VALUE + : _bytesUsed < _bytesCreditLimit ? _bytesCreditLimit - _bytesUsed : 0L; + } + + public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) + { + boolean notifyIncrease = true; + if(_messageCreditLimit > 0L) + { + notifyIncrease = (_messageUsed != _messageCreditLimit); + _messageUsed -= messageCredit; + + //TODO log warning + if(_messageUsed < 0L) + { + _messageUsed = 0; + } + } + + + + if(_bytesCreditLimit > 0L) + { + notifyIncrease = notifyIncrease && bytesCredit>0; + _bytesUsed -= bytesCredit; + + //TODO log warning + if(_bytesUsed < 0L) + { + _bytesUsed = 0; + } + + if(notifyIncrease) + { + notifyIncreaseBytesCredit(); + } + } + + + + setSuspended(!hasCredit()); + + } + + + + public synchronized boolean hasCredit() + { + return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed) + && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); + } + + public synchronized boolean useCreditForMessage(final ServerMessage msg) + { + if(_messageCreditLimit >= 0L) + { + if(_messageUsed < _messageCreditLimit) + { + if(_bytesCreditLimit < 0L) + { + _messageUsed++; + + return true; + } + else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + { + _messageUsed++; + _bytesUsed += msg.getSize(); + + return true; + } + else + { + //setSuspended(true); + return false; + } + } + else + { + setSuspended(true); + return false; + } + } + else if(_bytesCreditLimit >= 0L) + { + if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + { + _bytesUsed += msg.getSize(); + + return true; + } + else + { + //setSuspended(true); + return false; + } + + } + else + { + return true; + } + + } + + public void stop() + { + if(_bytesCreditLimit > 0) + { + _bytesCreditLimit = 0; + } + if(_messageCreditLimit > 0) + { + _messageCreditLimit = 0; + } + + } + + public synchronized void addCredit(long bytes, long count) + { + if(bytes > 0) + { + _bytesCreditLimit += bytes; + } + else if(bytes == -1) + { + _bytesCreditLimit = -1; + } + + + if(count > 0) + { + _messageCreditLimit += count; + } + else if(count == -1) + { + _messageCreditLimit = -1; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 485c8bd96a..3d694b7137 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -439,7 +439,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!sub.wouldSuspend(entry)) { - if (!sub.isBrowser() && !entry.acquire(sub)) + if (sub.acquires() && !entry.acquire(sub)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription @@ -556,7 +556,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener Subscription sub = subscriberIter.getNode().getSubscription(); // we don't make browsers send the same stuff twice - if (!sub.isBrowser()) + if (sub.seesRequeues()) { updateLastSeenEntry(sub, entry); } @@ -1255,7 +1255,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!sub.wouldSuspend(node)) { - if (!sub.isBrowser() && !node.acquire(sub)) + if (sub.acquires() && !node.acquire(sub)) { sub.restoreCredit(node); } @@ -1263,7 +1263,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { deliverMessage(sub, node); - if (sub.isBrowser()) + if (!sub.acquires()) { QueueEntry newNode = _entries.next(node); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index cc6b00609a..a99ca3b118 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -59,7 +59,9 @@ public interface Subscription boolean isClosed(); - boolean isBrowser(); + boolean acquires(); + + boolean seesRequeues(); void close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 5bb746e55f..382cda08da 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -33,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQMessage; @@ -547,7 +546,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public void restoreCredit(final QueueEntry queueEntry) { - _creditManager.addCredit(1, queueEntry.getSize()); + _creditManager.restoreCredit(1, queueEntry.getSize()); } @@ -626,4 +625,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); } + + public boolean acquires() + { + return !isBrowser(); + } + + public boolean seesRequeues() + { + return !isBrowser(); + } + + abstract boolean isBrowser(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index bdb3bf1b86..2004478ed4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -22,11 +22,17 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.FailedDequeueException; +import org.apache.qpid.server.queue.MessageCleanupException; import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.CreditCreditManager; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.transport.ServerSession; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; import org.apache.qpid.transport.*; @@ -35,10 +41,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; -import sun.awt.X11.XSystemTrayPeer; - public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener { private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -49,7 +54,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final AtomicBoolean _deleted = new AtomicBoolean(false); - private FlowCreditManager _creditManager; + private FlowCreditManager_0_10 _creditManager; private StateListener _stateListener = new StateListener() @@ -66,11 +71,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final FilterManager _filters; private final MessageAcceptMode _acceptMode; private final MessageAcquireMode _acquireMode; + private MessageFlowMode _flowMode; private final ServerSession _session; + private AtomicBoolean _stopped = new AtomicBoolean(true); + private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>(); public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, FlowCreditManager creditManager, FilterManager filters) + MessageAcquireMode acquireMode, FlowCreditManager_0_10 creditManager, FilterManager filters) { _session = session; _destination = destination; @@ -159,7 +167,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; + } + + public boolean seesRequeues() + { + return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT; } public void close() @@ -218,7 +231,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } - public void send(QueueEntry entry) throws AMQException + public void send(final QueueEntry entry) throws AMQException { ServerMessage serverMsg = entry.getMessage(); @@ -278,14 +291,78 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr deliveryProps.setRedelivered(entry.isRedelivered()); newHeaders.add(deliveryProps); - xfr.setHeader(new Header(newHeaders)); + xfr.setHeader(new Header(newHeaders)); + + if(_acceptMode == MessageAcceptMode.NONE) + { + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry)); + } + _session.sendMessage(xfr); + if(_acceptMode == MessageAcceptMode.EXPLICIT) + { + // potential race condition if incomming commands on this session can be processed on a different thread + // to this one (i.e. the message is only put in the map *after* it has been sent, theoretically we could get + // acknowledgement back before reaching the next line) + _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener() + { + public void onAccept() + { + acknowledge(entry); + } + + public void onRelease() + { + release(entry); + } + + public void onReject() + { + reject(entry); + } + }); + } + else + { + _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener() + { + public void onAccept() + { + // TODO : should log error of explicit accept on non-explicit sub + } + + public void onRelease() + { + release(entry); + } + + public void onReject() + { + reject(entry); + } + + }); + } + + + } + + private void reject(QueueEntry entry) + { + entry.setRedelivered(true); + entry.reject(this); } + private void release(QueueEntry entry) + { + entry.setRedelivered(true); + entry.release(); + } + public void queueDeleted(AMQQueue queue) { _deleted.set(true); @@ -308,7 +385,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void restoreCredit(QueueEntry queueEntry) { - _creditManager.addCredit(1, queueEntry.getSize()); + _creditManager.restoreCredit(1, queueEntry.getSize()); } public void setStateListener(StateListener listener) @@ -342,20 +419,92 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } - public FlowCreditManager getCreditManager() + public FlowCreditManager_0_10 getCreditManager() { return _creditManager; } - public void setCreditManager(FlowCreditManager creditManager) + + public void stop() + { + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + _stopped.set(true); + } + + public void addCredit(MessageCreditUnit unit, long value) + { + FlowCreditManager_0_10 creditManager = getCreditManager(); + + switch (unit) + { + case MESSAGE: + + creditManager.addCredit(value, 0L); + break; + case BYTE: + creditManager.addCredit(0L, value); + break; + } + + _stopped.set(false); + + if(creditManager.hasCredit()) + { + if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + { + _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + } + } + + } + + public void setFlowMode(MessageFlowMode flowMode) { + _creditManager.removeListener(this); - _creditManager = creditManager; + switch(flowMode) + { + case CREDIT: + _creditManager = new CreditCreditManager(0l,0l); + break; + case WINDOW: + _creditManager = new WindowCreditManager(0l,0l); + break; + default: + throw new RuntimeException("Unknown message flow mode: " + flowMode); + } + _creditManager.addStateListener(this); + } - creditManager.addStateListener(this); + public boolean isStopped() + { + return _stopped.get(); + } + public boolean acquires() + { + return _acquireMode == MessageAcquireMode.PRE_ACQUIRED; } + public void acknowledge(QueueEntry entry) + { + // TODO Fix Store Context / cleanup + try + { + entry.discard(new StoreContext()); + } + catch (FailedDequeueException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (MessageCleanupException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index d8ec0d881e..bbc2396063 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -21,15 +21,31 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.AMQException; -import java.util.ArrayList; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; +import static org.apache.qpid.util.Serial.*; public class ServerSession extends Session { + + + public static interface MessageDispositionChangeListener + { + public void onAccept(); + + public void onRelease(); + + public void onReject(); + } + + + private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = + new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); + ServerSession(Connection connection, Binary name, long expiry) { super(connection, name, expiry); @@ -58,8 +74,112 @@ public class ServerSession extends Session } } + public void sendMessage(MessageTransfer xfr) { invoke(xfr); } + + public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) + { + _messageDispositionListenerMap.put(xfr.getId(), acceptListener); + } + + + private static interface MessageDispositionAction + { + void performAction(MessageDispositionChangeListener listener); + } + + public void accept(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onAccept(); + } + }); + } + + + public void release(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onRelease(); + } + }); + } + + public void reject(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onReject(); + } + }); + } + + public void dispositionChange(RangeSet ranges, MessageDispositionAction action) + { + if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = ranges.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); + action.performAction(changeListener); + } + + + } + + } + + + } + } + + public void removeDispositionListener(Method method) + { + _messageDispositionListenerMap.remove(method.getId()); + } + + public void releaseAll() + { + for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) + { + listener.onRelease(); + } + _messageDispositionListenerMap.clear(); + } + + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index a178cba78b..2d224d721a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -27,9 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.flow.*; import org.apache.qpid.AMQException; @@ -37,12 +35,11 @@ import org.apache.qpid.AMQException; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; -import java.nio.ByteBuffer; public class ServerSessionDelegate extends SessionDelegate { private final IApplicationRegistry _appRegistry; - private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>(); + private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>(); public ServerSessionDelegate(IApplicationRegistry appRegistry) { @@ -62,7 +59,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageAccept(Session session, MessageAccept method) { - super.messageAccept(session, method); + ((ServerSession)session).accept(method.getTransfers()); } @@ -70,13 +67,13 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageReject(Session session, MessageReject method) { - super.messageReject(session, method); + ((ServerSession)session).reject(method.getTransfers()); } @Override public void messageRelease(Session session, MessageRelease method) { - super.messageRelease(session, method); + ((ServerSession)session).release(method.getTransfers()); } @Override @@ -102,7 +99,8 @@ public class ServerSessionDelegate extends SessionDelegate //TODO null check - FlowCreditManager creditManager = new MessageOnlyCreditManager(0L); + + FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L); // TODO filters @@ -439,6 +437,36 @@ public class ServerSessionDelegate extends SessionDelegate super.queueQuery(session, method); } + @Override + public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm) + { + String destination = sfm.getDestination(); + + Subscription_0_10 sub = _subscriptions.get(destination); + + // TODO null check + + if(sub.isStopped()) + { + sub.setFlowMode(sfm.getFlowMode()); + } + + + + } + + @Override + public void messageStop(Session ssn, MessageStop stop) + { + String destination = stop.getDestination(); + + Subscription_0_10 sub = _subscriptions.get(destination); + + // TODO null check + + sub.stop(); + + } @Override public void messageFlow(Session ssn, MessageFlow flow) @@ -447,12 +475,20 @@ public class ServerSessionDelegate extends SessionDelegate Subscription_0_10 sub = _subscriptions.get(destination); - FlowCreditManager creditManager = sub.getCreditManager(); + // TODO null check - if(flow.getUnit() == MessageCreditUnit.MESSAGE) + sub.addCredit(flow.getUnit(), flow.getValue()); + + } + + @Override + public void closed(Session session) + { + super.closed(session); + for(Subscription_0_10 sub : _subscriptions.values()) { - creditManager.addCredit(flow.getValue(), 0L); + sub.close(); + ((ServerSession)session).releaseAll(); } - } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index c8ca126136..51d9bd8be2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -119,6 +119,16 @@ public class MockSubscription implements Subscription return _closed; } + public boolean acquires() + { + return true; + } + + public boolean seesRequeues() + { + return true; + } + public boolean isSuspended() { return false; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index e1e1f846cb..4b7f711bff 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -35,6 +35,7 @@ import static org.apache.qpid.transport.util.Functions.*; public abstract class Method extends Struct implements ProtocolEvent { + public static final Method create(int type) { // XXX: should generate separate factories for separate @@ -43,12 +44,18 @@ public abstract class Method extends Struct implements ProtocolEvent } // XXX: command subclass? + public static interface CompletionListener + { + public void onComplete(Method method); + } + private int id; private int channel; private boolean idSet = false; private boolean sync = false; private boolean batch = false; private boolean unreliable = false; + private CompletionListener completionListener; public final int getId() { @@ -61,6 +68,11 @@ public abstract class Method extends Struct implements ProtocolEvent this.idSet = true; } + boolean idSet() + { + return idSet; + } + public final int getChannel() { return channel; @@ -152,6 +164,21 @@ public abstract class Method extends Struct implements ProtocolEvent } } + + public void setCompletionListener(CompletionListener completionListener) + { + this.completionListener = completionListener; + } + + public void complete() + { + if(completionListener!= null) + { + completionListener.onComplete(this); + completionListener = null; + } + } + public String toString() { StringBuilder str = new StringBuilder(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java index 9b2744ee8b..3850dc162b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java @@ -52,6 +52,11 @@ public final class RangeSet implements Iterable<Range> return ranges.getFirst(); } + public Range getLast() + { + return ranges.getLast(); + } + public boolean includes(Range range) { for (Range r : this) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index a8757bcb3c..6a69c62300 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -448,7 +448,7 @@ public class Session extends SessionInvoker } } - boolean complete(int lower, int upper) + protected boolean complete(int lower, int upper) { //avoid autoboxing if(log.isDebugEnabled()) @@ -465,8 +465,9 @@ public class Session extends SessionInvoker if (m != null) { commandBytes -= m.getBodySize(); + m.complete(); + commands[idx] = null; } - commands[idx] = null; } if (le(lower, maxComplete + 1)) { @@ -561,7 +562,8 @@ public class Session extends SessionInvoker "(state=%s)", state)); } - int next = commandsOut++; + int next; + next = commandsOut++; m.setId(next); if (isFull(next)) @@ -918,6 +920,14 @@ public class Session extends SessionInvoker } } } + if(state == CLOSED) + { + delegate.closed(this); + } + else + { + delegate.detached(this); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index c8d0855607..6146f029b2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -184,4 +184,11 @@ public class SessionDelegate } } + public void closed(Session session) + { + } + + public void detached(Session session) + { + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 73cb8cd2cd..9271e1ce16 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -210,6 +210,16 @@ public class SubscriptionTestHelper implements Subscription return false; } + public boolean acquires() + { + return true; + } + + public boolean seesRequeues() + { + return true; + } + public boolean isBrowser() { return false; |