diff options
author | Keith Wall <kwall@apache.org> | 2014-12-11 17:14:34 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-12-11 17:14:34 +0000 |
commit | d9514d45f11c92aef06b8b880e291f76fdbff2a2 (patch) | |
tree | 410c7d1ca116e70d0777b4c3a9e5a204d287432e /qpid/java | |
parent | 66c5a217c9891e1dfee14eeb093866373293f265 (diff) | |
download | qpid-python-d9514d45f11c92aef06b8b880e291f76fdbff2a2.tar.gz |
Extend credit managers to be aware of transport backpressue
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
29 files changed, 328 insertions, 319 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java deleted file mode 100644 index be3a13d2d3..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ /dev/null @@ -1,87 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -import java.util.concurrent.atomic.AtomicLong; - -public class BytesOnlyCreditManager extends AbstractFlowCreditManager -{ - private final AtomicLong _bytesCredit; - - public BytesOnlyCreditManager(long initialCredit) - { - _bytesCredit = new AtomicLong(initialCredit); - } - - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return _bytesCredit.get(); - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - _bytesCredit.addAndGet(bytesCredit); - setSuspended(false); - } - - public void removeAllCredit() - { - _bytesCredit.set(0L); - } - - public boolean hasCredit() - { - return _bytesCredit.get() > 0L; - } - - public boolean useCreditForMessage(long msgSize) - { - if(hasCredit()) - { - if(_bytesCredit.addAndGet(-msgSize) >= 0) - { - return true; - } - else - { - _bytesCredit.addAndGet(msgSize); - setSuspended(true); - return false; - } - } - else - { - return false; - } - - } - - public void setBytesCredit(long bytesCredit) - { - _bytesCredit.set( bytesCredit ); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index 280f2851a4..08aac0b511 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -24,10 +24,6 @@ package org.apache.qpid.server.flow; public interface FlowCreditManager { - long getMessageCredit(); - - long getBytesCredit(); - public static interface FlowCreditManagerListener { void creditStateChanged(boolean hasCredit); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java deleted file mode 100644 index 31c1fda968..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ /dev/null @@ -1,90 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - private long _messageCredit; - private long _bytesCredit; - - public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit) - { - _messageCredit = messageCredit; - _bytesCredit = bytesCredit; - } - - public synchronized long getMessageCredit() - { - return _messageCredit; - } - - public synchronized long getBytesCredit() - { - return _bytesCredit; - } - - public synchronized void restoreCredit(long messageCredit, long bytesCredit) - { - _messageCredit += messageCredit; - _bytesCredit += bytesCredit; - setSuspended(hasCredit()); - } - - public synchronized void removeAllCredit() - { - _messageCredit = 0L; - _bytesCredit = 0L; - setSuspended(true); - } - - public synchronized boolean hasCredit() - { - return (_messageCredit > 0L) && ( _bytesCredit > 0L ); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCredit == 0L) - { - setSuspended(true); - return false; - } - else - { - if(msgSize > _bytesCredit) - { - setSuspended(true); - return false; - } - _messageCredit--; - _bytesCredit -= msgSize; - setSuspended(false); - return true; - } - - } - - public synchronized void setBytesCredit(long bytesCredit) - { - _bytesCredit = bytesCredit; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index f13af479ad..40aa1bbafd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -113,4 +113,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo * @return the time of the last activity or 0 if not in a transaction */ long getTransactionUpdateTime(); + + void transportStateChanged(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 6ea9f3600c..3c25e0934c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -157,6 +157,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return _delegate.isTransportBlockedForWriting(); + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _delegate.setTransportBlockedForWriting(blocked); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) @@ -268,6 +280,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { return new Subject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } } private class SelfDelegateProtocolEngine implements ServerProtocolEngine @@ -408,6 +431,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 1c42d9b6fe..47ed224133 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -456,6 +456,12 @@ public class MockConsumer implements ConsumerTarget { return 0; } + + @Override + public void transportStateChanged() + { + + } } private static class MockConnectionModel implements AMQConnectionModel @@ -663,5 +669,7 @@ public class MockConsumer implements ConsumerTarget { } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 89d681111b..afa4fb8bc0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -158,6 +158,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _name; } + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + } public static class AddMessageDispositionListenerAction implements Runnable { @@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC switch(flowMode) { case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); + _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); + _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; default: // this should never happen, as 0-10 is finalised and so the enum should never change diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index 8dddac9809..e670c1f88b 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,48 +21,27 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCredit; private volatile long _messageCredit; - public CreditCreditManager(long bytesCredit, long messageCredit) + public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCredit = bytesCredit; _messageCredit = messageCredit; setSuspended(!hasCredit()); } - - public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit == -1L - ? Long.MAX_VALUE - : _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit == -1L - ? Long.MAX_VALUE - : _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + setSuspended(!hasCredit()); } @@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { // Note !=, if credit is < 0 that indicates infinite credit - return (_bytesCredit != 0L && _messageCredit != 0L); + return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting()); } public synchronized boolean useCreditForMessage(long msgSize) { - if(_messageCredit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCredit >= 0L) { if(_messageCredit > 0) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 30aecdb2d2..5c919252b8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator conn.setRemoteAddress(network.getRemoteAddress()); conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network); + ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network); + conn.setProtocolEngine(protocolEngine); + + return protocolEngine; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 4adf472c5d..cb96870e74 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -54,6 +54,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _createTime = System.currentTimeMillis(); private long _lastReadTime = _createTime; private long _lastWriteTime = _createTime; + private volatile boolean _transportBlockedForWriting; public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) @@ -249,4 +250,18 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getAuthorizedSubject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + } + } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8567be37f0..cbd569d036 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; @@ -90,6 +91,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private int _messageCompressionThreshold; private int _maxMessageSize; + private ServerProtocolEngine _serverProtocolEngine; + public ServerConnection(final long connectionId, Broker<?> broker, final AmqpPort<?> port, @@ -189,6 +192,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } + public ServerProtocolEngine getProtocolEngine() + { + return _serverProtocolEngine; + } + + public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; @@ -664,4 +677,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _maxMessageSize; } + + public void transportStateChanged() + { + for (AMQSessionModel ssn : getSessionModels()) + { + ssn.transportStateChanged(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 223de4f84e..1d8676edd6 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -874,6 +875,15 @@ public class ServerSession extends Session } @Override + public void transportStateChanged() + { + for(ConsumerTarget_0_10 consumerTarget : getSubscriptions()) + { + consumerTarget.transportStateChanged(); + } + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index a117ddb0c6..8fdee7a0f7 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -35,6 +35,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; @@ -244,8 +245,8 @@ public class ServerSessionDelegate extends SessionDelegate } else { - - FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); + ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine(); + FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine); FilterManager filterManager = null; try diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java index 8e48741b91..e11d2ce9bb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java @@ -21,11 +21,14 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; + +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; @@ -33,39 +36,22 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl private volatile long _bytesUsed; private volatile long _messageUsed; - public WindowCreditManager() - { - this(0L, 0L); - } - - public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit) + public WindowCreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCreditLimit = bytesCreditLimit; _messageCreditLimit = messageCreditLimit; setSuspended(!hasCredit()); } - public long getBytesCreditLimit() - { - return _bytesCreditLimit; - } - public long getMessageCreditLimit() { return _messageCreditLimit; } - public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - - setSuspended(!hasCredit()); - - } - - public long getMessageCredit() { return _messageCreditLimit == -1L @@ -121,12 +107,18 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed) - && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); + && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed) + && !_serverProtocolEngine.isTransportBlockedForWriting(); } public synchronized boolean useCreditForMessage(final long msgSize) { - if(_messageCreditLimit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit >= 0L) { if(_messageUsed < _messageCreditLimit) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java index 1c4a694be6..b05edc5d04 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java @@ -20,17 +20,25 @@ */ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.protocol.v0_10.WindowCreditManager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.test.utils.QpidTestCase; public class WindowCreditManagerTest extends QpidTestCase { private WindowCreditManager _creditManager; + private ServerProtocolEngine _protocolEngine; protected void setUp() throws Exception { super.setUp(); - _creditManager = new WindowCreditManager(); + + _protocolEngine = mock(ServerProtocolEngine.class); + when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false); + + _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine); } /** diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7604662980..9a6059ccbf 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -66,8 +66,6 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -131,7 +129,8 @@ public class AMQChannel private final int _channelId; - private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + private final Pre0_10CreditManager _creditManager; + private final FlowCreditManager _noAckCreditManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -213,6 +212,9 @@ public class AMQChannel public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { + _creditManager = new Pre0_10CreditManager(0l,0l, connection); + _noAckCreditManager = new NoAckCreditManager(connection); + _connection = connection; _channelId = channelId; @@ -699,7 +701,7 @@ public class AMQChannel if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager); } else if(acks) { @@ -709,7 +711,7 @@ public class AMQChannel } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -1644,6 +1646,7 @@ public class AMQChannel } } + public synchronized void block(AMQQueue queue) { if(_blockingEntities.add(queue)) @@ -1672,6 +1675,13 @@ public class AMQChannel } @Override + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + _noAckCreditManager.restoreCredit(0, 0); + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 606649445d..cea9b0930f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -188,6 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private int _currentMethodId; private int _binaryDataLimit; private long _maxMessageSize; + private volatile boolean _transportBlockedForWriting; public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, @@ -250,6 +251,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _authorizedSubject; } + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + for(AMQChannel channel : _channelMap.values()) + { + channel.transportStateChanged(); + } + } + public void setNetworkConnection(NetworkConnection network) { setNetworkConnection(network, network.getSender()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 43982db2fd..d6642aef2e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -136,12 +136,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, @@ -215,12 +209,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - private static final ServerTransaction.Action NOOP = new ServerTransaction.Action() { @@ -250,11 +238,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public boolean allocateCredit(ServerMessage msg) - { - return getCreditManager().useCreditForMessage(msg.getSize()); - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java index 1817e8ad31..af54c911dc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java @@ -18,10 +18,13 @@ * under the License. * */ -package org.apache.qpid.server.flow; +package org.apache.qpid.server.protocol.v0_8; import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager { private final AtomicLong _messageCredit; @@ -31,16 +34,6 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen _messageCredit = new AtomicLong(initialCredit); } - public long getMessageCredit() - { - return _messageCredit.get(); - } - - public long getBytesCredit() - { - return -1L; - } - public void restoreCredit(long messageCredit, long bytesCredit) { _messageCredit.addAndGet(messageCredit); @@ -48,12 +41,6 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen } - public void removeAllCredit() - { - setSuspended(true); - _messageCredit.set(0L); - } - public boolean hasCredit() { return _messageCredit.get() > 0L; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java new file mode 100644 index 0000000000..2d32617106 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; + +public class NoAckCreditManager extends AbstractFlowCreditManager +{ + private final ServerProtocolEngine _serverProtocolEngine; + + public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + + @Override + public void restoreCredit(final long messageCredit, final long bytesCredit) + { + setSuspended(!hasCredit()); + } + + @Override + public boolean hasCredit() + { + return !_serverProtocolEngine.isTransportBlockedForWriting(); + } + + @Override + public boolean useCreditForMessage(final long msgSize) + { + if (!hasCredit()) + { + setSuspended(true); + return false; + } + return true; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java index fc2d4bfb53..e63645ed09 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java @@ -18,20 +18,28 @@ * under the License. * */ -package org.apache.qpid.server.flow; +package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { + private final ServerProtocolEngine _protocolEngine; private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; private volatile long _bytesCredit; private volatile long _messageCredit; - public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit) + public Pre0_10CreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine protocolEngine) { + _protocolEngine = protocolEngine; _bytesCreditLimit = bytesCreditLimit; _messageCreditLimit = messageCreditLimit; _bytesCredit = bytesCreditLimit; @@ -39,6 +47,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } + public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) { long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; @@ -80,16 +89,6 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } - public long getMessageCredit() - { - return _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { final long messageCreditLimit = _messageCreditLimit; @@ -119,22 +118,21 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } - public synchronized void removeAllCredit() - { - _bytesCredit = 0L; - _messageCredit = 0L; - setSuspended(!hasCredit()); - } - public synchronized boolean hasCredit() { return (_bytesCreditLimit == 0L || _bytesCredit > 0) - && (_messageCreditLimit == 0L || _messageCredit > 0); + && (_messageCreditLimit == 0L || _messageCredit > 0) + && !_protocolEngine.isTransportBlockedForWriting(); } public synchronized boolean useCreditForMessage(final long msgSize) { - if(_messageCreditLimit != 0L) + if (_protocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit != 0L) { if(_messageCredit != 0L) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 9326f16703..55fc865850 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; @@ -328,7 +326,7 @@ public class AckTest extends QpidTestCase public void testMessageDequeueRestoresCreditTest() throws Exception { // Send 10 messages - Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine); _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java index 89fc60666b..c4c89ac24a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java @@ -18,20 +18,14 @@ * under the License. * */ -package org.apache.qpid.server.flow; +package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager { - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return -1L; - } public void restoreCredit(long messageCredit, long bytesCredit) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 8e24d55da0..b55bd03a91 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -44,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; @@ -64,6 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; + private final ServerProtocolEngine _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -101,12 +103,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private boolean _closedOnOpen; + public Connection_1_0(Broker<?> broker, ConnectionEndpoint conn, long connectionId, AmqpPort<?> port, - Transport transport, final SubjectCreator subjectCreator) + Transport transport, + final SubjectCreator subjectCreator, + final ServerProtocolEngine protocolEngine) { + _protocolEngine = protocolEngine; _broker = broker; _port = port; _transport = transport; @@ -363,6 +369,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return _port; } + public ServerProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + @Override public Transport getTransport() { @@ -480,4 +491,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index c5d9a5e35d..b5e1bdafbb 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +85,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public boolean isSuspended() { - return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; } @@ -290,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized (_link.getLock()) { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); if(!hasCredit && getState() == State.ACTIVE) { suspend(); @@ -330,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized(_link.getLock()) { - if(isSuspended() && getEndpoint() != null) + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) { updateState(State.SUSPENDED, State.ACTIVE); _transactionId = _link.getTransactionId(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 3bbfaac466..b2783a2da2 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -118,6 +118,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private NetworkConnection _network; private Sender<ByteBuffer> _sender; private Connection_1_0 _connection; + private volatile boolean _transportBlockedForWriting; static enum State { @@ -216,7 +217,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); @@ -529,6 +530,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + + public void close() { _sender.close(); @@ -559,4 +562,18 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { return _lastWriteTime; } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + + } + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 2cfe431979..f8e4853099 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -728,4 +728,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _consumer; } + + public ConsumerTarget_1_0 getConsumerTarget() + { + return _target; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 8d71f980e5..f5827a3766 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Subject _subject = new Subject(); private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; @@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); - registerConsumer(sendingLink.getConsumer()); + registerConsumer(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) @@ -411,12 +412,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } - private void registerConsumer(final ConsumerImpl consumer) + private void registerConsumer(final SendingLink_1_0 link) { + ConsumerImpl consumer = link.getConsumer(); if(consumer instanceof Consumer<?>) { Consumer<?> modelConsumer = (Consumer<?>) consumer; _consumers.add(modelConsumer); + _sendingLinks.add(link); modelConsumer.addChangeListener(_consumerClosedListener); consumerAdded(modelConsumer); } @@ -609,6 +612,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override + public void transportStateChanged() + { + for(SendingLink_1_0 link : _sendingLinks) + { + ConsumerTarget_1_0 target = link.getConsumerTarget(); + target.flowStateChanged(); + + + } + + + } + + @Override public LogSubject getLogSubject() { return this; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java index 5c6918e87d..35d262cdb3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java @@ -30,4 +30,8 @@ public interface ServerProtocolEngine extends ProtocolEngine long getConnectionId(); Subject getSubject(); + + boolean isTransportBlockedForWriting(); + + void setTransportBlockedForWriting(boolean blocked); } |