summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-11 17:14:34 +0000
committerKeith Wall <kwall@apache.org>2014-12-11 17:14:34 +0000
commitd9514d45f11c92aef06b8b880e291f76fdbff2a2 (patch)
tree410c7d1ca116e70d0777b4c3a9e5a204d287432e
parent66c5a217c9891e1dfee14eeb093866373293f265 (diff)
downloadqpid-python-d9514d45f11c92aef06b8b880e291f76fdbff2a2.tar.gz
Extend credit managers to be aware of transport backpressue
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644704 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java87
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java90
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java34
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java40
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java5
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java15
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java21
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java38
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java17
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java17
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java)21
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java56
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java)40
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java)14
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java20
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java19
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java4
29 files changed, 328 insertions, 319 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
deleted file mode 100644
index be3a13d2d3..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class BytesOnlyCreditManager extends AbstractFlowCreditManager
-{
- private final AtomicLong _bytesCredit;
-
- public BytesOnlyCreditManager(long initialCredit)
- {
- _bytesCredit = new AtomicLong(initialCredit);
- }
-
- public long getMessageCredit()
- {
- return -1L;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit.get();
- }
-
- public void restoreCredit(long messageCredit, long bytesCredit)
- {
- _bytesCredit.addAndGet(bytesCredit);
- setSuspended(false);
- }
-
- public void removeAllCredit()
- {
- _bytesCredit.set(0L);
- }
-
- public boolean hasCredit()
- {
- return _bytesCredit.get() > 0L;
- }
-
- public boolean useCreditForMessage(long msgSize)
- {
- if(hasCredit())
- {
- if(_bytesCredit.addAndGet(-msgSize) >= 0)
- {
- return true;
- }
- else
- {
- _bytesCredit.addAndGet(msgSize);
- setSuspended(true);
- return false;
- }
- }
- else
- {
- return false;
- }
-
- }
-
- public void setBytesCredit(long bytesCredit)
- {
- _bytesCredit.set( bytesCredit );
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index 280f2851a4..08aac0b511 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -24,10 +24,6 @@ package org.apache.qpid.server.flow;
public interface FlowCreditManager
{
- long getMessageCredit();
-
- long getBytesCredit();
-
public static interface FlowCreditManagerListener
{
void creditStateChanged(boolean hasCredit);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
deleted file mode 100644
index 31c1fda968..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
- private long _messageCredit;
- private long _bytesCredit;
-
- public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
- {
- _messageCredit = messageCredit;
- _bytesCredit = bytesCredit;
- }
-
- public synchronized long getMessageCredit()
- {
- return _messageCredit;
- }
-
- public synchronized long getBytesCredit()
- {
- return _bytesCredit;
- }
-
- public synchronized void restoreCredit(long messageCredit, long bytesCredit)
- {
- _messageCredit += messageCredit;
- _bytesCredit += bytesCredit;
- setSuspended(hasCredit());
- }
-
- public synchronized void removeAllCredit()
- {
- _messageCredit = 0L;
- _bytesCredit = 0L;
- setSuspended(true);
- }
-
- public synchronized boolean hasCredit()
- {
- return (_messageCredit > 0L) && ( _bytesCredit > 0L );
- }
-
- public synchronized boolean useCreditForMessage(final long msgSize)
- {
- if(_messageCredit == 0L)
- {
- setSuspended(true);
- return false;
- }
- else
- {
- if(msgSize > _bytesCredit)
- {
- setSuspended(true);
- return false;
- }
- _messageCredit--;
- _bytesCredit -= msgSize;
- setSuspended(false);
- return true;
- }
-
- }
-
- public synchronized void setBytesCredit(long bytesCredit)
- {
- _bytesCredit = bytesCredit;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index f13af479ad..40aa1bbafd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -113,4 +113,6 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
* @return the time of the last activity or 0 if not in a transaction
*/
long getTransactionUpdateTime();
+
+ void transportStateChanged();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 6ea9f3600c..3c25e0934c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -157,6 +157,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _delegate.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _delegate.setTransportBlockedForWriting(blocked);
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
@@ -268,6 +280,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
return new Subject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -408,6 +431,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 1c42d9b6fe..47ed224133 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -456,6 +456,12 @@ public class MockConsumer implements ConsumerTarget
{
return 0;
}
+
+ @Override
+ public void transportStateChanged()
+ {
+
+ }
}
private static class MockConnectionModel implements AMQConnectionModel
@@ -663,5 +669,7 @@ public class MockConsumer implements ConsumerTarget
{
}
+
+
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 89d681111b..afa4fb8bc0 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -158,6 +158,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
return _name;
}
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ }
public static class AddMessageDispositionListenerAction implements Runnable
{
@@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
switch(flowMode)
{
case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
+ _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
+ _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
default:
// this should never happen, as 0-10 is finalised and so the enum should never change
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 8dddac9809..e670c1f88b 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,48 +21,27 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private final ServerProtocolEngine _serverProtocolEngine;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public CreditCreditManager(long bytesCredit, long messageCredit)
+ public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine)
{
+ _serverProtocolEngine = serverProtocolEngine;
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
setSuspended(!hasCredit());
}
-
- public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
- {
- _bytesCredit = bytesCredit;
- _messageCredit = messageCredit;
-
- setSuspended(!hasCredit());
-
- }
-
-
- public long getMessageCredit()
- {
- return _messageCredit == -1L
- ? Long.MAX_VALUE
- : _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit == -1L
- ? Long.MAX_VALUE
- : _bytesCredit;
- }
-
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ setSuspended(!hasCredit());
}
@@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized boolean hasCredit()
{
// Note !=, if credit is < 0 that indicates infinite credit
- return (_bytesCredit != 0L && _messageCredit != 0L);
+ return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting());
}
public synchronized boolean useCreditForMessage(long msgSize)
{
- if(_messageCredit >= 0L)
+ if (_serverProtocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCredit >= 0L)
{
if(_messageCredit > 0)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 30aecdb2d2..5c919252b8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
conn.setRemoteAddress(network.getRemoteAddress());
conn.setLocalAddress(network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, network);
+ ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network);
+ conn.setProtocolEngine(protocolEngine);
+
+ return protocolEngine;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 4adf472c5d..cb96870e74 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -54,6 +54,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _createTime = System.currentTimeMillis();
private long _lastReadTime = _createTime;
private long _lastWriteTime = _createTime;
+ private volatile boolean _transportBlockedForWriting;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
@@ -249,4 +250,18 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return _connection.getAuthorizedSubject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8567be37f0..cbd569d036 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
@@ -90,6 +91,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private int _messageCompressionThreshold;
private int _maxMessageSize;
+ private ServerProtocolEngine _serverProtocolEngine;
+
public ServerConnection(final long connectionId,
Broker<?> broker,
final AmqpPort<?> port,
@@ -189,6 +192,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
super.setConnectionDelegate(delegate);
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _serverProtocolEngine;
+ }
+
+ public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+ {
+ _serverProtocolEngine = serverProtocolEngine;
+ }
+
public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
@@ -664,4 +677,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
return _maxMessageSize;
}
+
+ public void transportStateChanged()
+ {
+ for (AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.transportStateChanged();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 223de4f84e..1d8676edd6 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -874,6 +875,15 @@ public class ServerSession extends Session
}
@Override
+ public void transportStateChanged()
+ {
+ for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+ {
+ consumerTarget.transportStateChanged();
+ }
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index a117ddb0c6..8fdee7a0f7 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -35,6 +35,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -244,8 +245,8 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
-
- FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+ ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine();
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine);
FilterManager filterManager = null;
try
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index 8e48741b91..e11d2ce9bb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -21,11 +21,14 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+ private final ServerProtocolEngine _serverProtocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -33,39 +36,22 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
private volatile long _bytesUsed;
private volatile long _messageUsed;
- public WindowCreditManager()
- {
- this(0L, 0L);
- }
-
- public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+ public WindowCreditManager(long bytesCreditLimit,
+ long messageCreditLimit,
+ ServerProtocolEngine serverProtocolEngine)
{
+ _serverProtocolEngine = serverProtocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
setSuspended(!hasCredit());
}
- public long getBytesCreditLimit()
- {
- return _bytesCreditLimit;
- }
-
public long getMessageCreditLimit()
{
return _messageCreditLimit;
}
- public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
- {
- _bytesCreditLimit = bytesCreditLimit;
- _messageCreditLimit = messageCreditLimit;
-
- setSuspended(!hasCredit());
-
- }
-
-
public long getMessageCredit()
{
return _messageCreditLimit == -1L
@@ -121,12 +107,18 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
- && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
+ && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
+ && !_serverProtocolEngine.isTransportBlockedForWriting();
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if(_messageCreditLimit >= 0L)
+ if (_serverProtocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCreditLimit >= 0L)
{
if(_messageUsed < _messageCreditLimit)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index 1c4a694be6..b05edc5d04 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -20,17 +20,25 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
public class WindowCreditManagerTest extends QpidTestCase
{
private WindowCreditManager _creditManager;
+ private ServerProtocolEngine _protocolEngine;
protected void setUp() throws Exception
{
super.setUp();
- _creditManager = new WindowCreditManager();
+
+ _protocolEngine = mock(ServerProtocolEngine.class);
+ when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
+
+ _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
}
/**
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 7604662980..9a6059ccbf 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -66,8 +66,6 @@ import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -131,7 +129,8 @@ public class AMQChannel
private final int _channelId;
- private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
+ private final Pre0_10CreditManager _creditManager;
+ private final FlowCreditManager _noAckCreditManager;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -213,6 +212,9 @@ public class AMQChannel
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
+ _creditManager = new Pre0_10CreditManager(0l,0l, connection);
+ _noAckCreditManager = new NoAckCreditManager(connection);
+
_connection = connection;
_channelId = channelId;
@@ -699,7 +701,7 @@ public class AMQChannel
if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager);
}
else if(acks)
{
@@ -709,7 +711,7 @@ public class AMQChannel
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -1644,6 +1646,7 @@ public class AMQChannel
}
}
+
public synchronized void block(AMQQueue queue)
{
if(_blockingEntities.add(queue))
@@ -1672,6 +1675,13 @@ public class AMQChannel
}
@Override
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ _noAckCreditManager.restoreCredit(0, 0);
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 606649445d..cea9b0930f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -188,6 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private int _currentMethodId;
private int _binaryDataLimit;
private long _maxMessageSize;
+ private volatile boolean _transportBlockedForWriting;
public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
@@ -250,6 +251,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _authorizedSubject;
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.transportStateChanged();
+ }
+ }
+
public void setNetworkConnection(NetworkConnection network)
{
setNetworkConnection(network, network.getSender());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 43982db2fd..d6642aef2e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -136,12 +136,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
- }
-
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -215,12 +209,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
- }
-
private static final ServerTransaction.Action NOOP =
new ServerTransaction.Action()
{
@@ -250,11 +238,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public boolean allocateCredit(ServerMessage msg)
- {
- return getCreditManager().useCreditForMessage(msg.getSize());
- }
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
index 1817e8ad31..af54c911dc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
@@ -18,10 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
private final AtomicLong _messageCredit;
@@ -31,16 +34,6 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
_messageCredit = new AtomicLong(initialCredit);
}
- public long getMessageCredit()
- {
- return _messageCredit.get();
- }
-
- public long getBytesCredit()
- {
- return -1L;
- }
-
public void restoreCredit(long messageCredit, long bytesCredit)
{
_messageCredit.addAndGet(messageCredit);
@@ -48,12 +41,6 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
}
- public void removeAllCredit()
- {
- setSuspended(true);
- _messageCredit.set(0L);
- }
-
public boolean hasCredit()
{
return _messageCredit.get() > 0L;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
new file mode 100644
index 0000000000..2d32617106
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+
+public class NoAckCreditManager extends AbstractFlowCreditManager
+{
+ private final ServerProtocolEngine _serverProtocolEngine;
+
+ public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine)
+ {
+ _serverProtocolEngine = serverProtocolEngine;
+ }
+
+ @Override
+ public void restoreCredit(final long messageCredit, final long bytesCredit)
+ {
+ setSuspended(!hasCredit());
+ }
+
+ @Override
+ public boolean hasCredit()
+ {
+ return !_serverProtocolEngine.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public boolean useCreditForMessage(final long msgSize)
+ {
+ if (!hasCredit())
+ {
+ setSuspended(true);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
index fc2d4bfb53..e63645ed09 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -18,20 +18,28 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
+ private final ServerProtocolEngine _protocolEngine;
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
private volatile long _bytesCredit;
private volatile long _messageCredit;
- public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
+ public Pre0_10CreditManager(long bytesCreditLimit,
+ long messageCreditLimit,
+ ServerProtocolEngine protocolEngine)
{
+ _protocolEngine = protocolEngine;
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
_bytesCredit = bytesCreditLimit;
@@ -39,6 +47,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
+
public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
{
long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
@@ -80,16 +89,6 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
- public long getMessageCredit()
- {
- return _messageCredit;
- }
-
- public long getBytesCredit()
- {
- return _bytesCredit;
- }
-
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
final long messageCreditLimit = _messageCreditLimit;
@@ -119,22 +118,21 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
- public synchronized void removeAllCredit()
- {
- _bytesCredit = 0L;
- _messageCredit = 0L;
- setSuspended(!hasCredit());
- }
-
public synchronized boolean hasCredit()
{
return (_bytesCreditLimit == 0L || _bytesCredit > 0)
- && (_messageCreditLimit == 0L || _messageCredit > 0);
+ && (_messageCreditLimit == 0L || _messageCredit > 0)
+ && !_protocolEngine.isTransportBlockedForWriting();
}
public synchronized boolean useCreditForMessage(final long msgSize)
{
- if(_messageCreditLimit != 0L)
+ if (_protocolEngine.isTransportBlockedForWriting())
+ {
+ setSuspended(true);
+ return false;
+ }
+ else if(_messageCreditLimit != 0L)
{
if(_messageCredit != 0L)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 9326f16703..55fc865850 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
@@ -328,7 +326,7 @@ public class AckTest extends QpidTestCase
public void testMessageDequeueRestoresCreditTest() throws Exception
{
// Send 10 messages
- Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine);
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
index 89fc60666b..c4c89ac24a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
@@ -18,20 +18,14 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
- public long getMessageCredit()
- {
- return -1L;
- }
-
- public long getBytesCredit()
- {
- return -1L;
- }
public void restoreCredit(long messageCredit, long bytesCredit)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 8e24d55da0..b55bd03a91 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -44,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.End;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
@@ -64,6 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final AmqpPort<?> _port;
private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
+ private final ServerProtocolEngine _protocolEngine;
private VirtualHostImpl _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
@@ -101,12 +103,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private boolean _closedOnOpen;
+
public Connection_1_0(Broker<?> broker,
ConnectionEndpoint conn,
long connectionId,
AmqpPort<?> port,
- Transport transport, final SubjectCreator subjectCreator)
+ Transport transport,
+ final SubjectCreator subjectCreator,
+ final ServerProtocolEngine protocolEngine)
{
+ _protocolEngine = protocolEngine;
_broker = broker;
_port = port;
_transport = transport;
@@ -363,6 +369,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
return _port;
}
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
@Override
public Transport getTransport()
{
@@ -480,4 +491,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
+ public void transportStateChanged()
+ {
+ for (Session_1_0 session : _sessions)
+ {
+ session.transportStateChanged();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index c5d9a5e35d..b5e1bdafbb 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +85,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
public boolean isSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
}
@@ -290,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized (_link.getLock())
{
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
suspend();
@@ -330,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized(_link.getLock())
{
- if(isSuspended() && getEndpoint() != null)
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 3bbfaac466..b2783a2da2 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -118,6 +118,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
private Connection_1_0 _connection;
+ private volatile boolean _transportBlockedForWriting;
static enum State {
@@ -216,7 +217,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_endpoint.setProperties(serverProperties);
_endpoint.setRemoteAddress(getRemoteAddress());
- _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this);
_endpoint.setConnectionEventListener(_connection);
_endpoint.setFrameOutputHandler(this);
@@ -529,6 +530,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
+
+
public void close()
{
_sender.close();
@@ -559,4 +562,18 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
return _lastWriteTime;
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 2cfe431979..f8e4853099 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -728,4 +728,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
return _consumer;
}
+
+ public ConsumerTarget_1_0 getConsumerTarget()
+ {
+ return _target;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 8d71f980e5..f5827a3766 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private final Subject _subject = new Subject();
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
@@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
);
sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
- registerConsumer(sendingLink.getConsumer());
+ registerConsumer(sendingLink);
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
@@ -411,12 +412,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
}
- private void registerConsumer(final ConsumerImpl consumer)
+ private void registerConsumer(final SendingLink_1_0 link)
{
+ ConsumerImpl consumer = link.getConsumer();
if(consumer instanceof Consumer<?>)
{
Consumer<?> modelConsumer = (Consumer<?>) consumer;
_consumers.add(modelConsumer);
+ _sendingLinks.add(link);
modelConsumer.addChangeListener(_consumerClosedListener);
consumerAdded(modelConsumer);
}
@@ -609,6 +612,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
@Override
+ public void transportStateChanged()
+ {
+ for(SendingLink_1_0 link : _sendingLinks)
+ {
+ ConsumerTarget_1_0 target = link.getConsumerTarget();
+ target.flowStateChanged();
+
+
+ }
+
+
+ }
+
+ @Override
public LogSubject getLogSubject()
{
return this;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
index 5c6918e87d..35d262cdb3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -30,4 +30,8 @@ public interface ServerProtocolEngine extends ProtocolEngine
long getConnectionId();
Subject getSubject();
+
+ boolean isTransportBlockedForWriting();
+
+ void setTransportBlockedForWriting(boolean blocked);
}