summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
commitf5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch)
tree25544486642cc770061489663dba650d85769404 /qpid/java/common
parent085486ebe5ff21133b9caf1c31625ac6ea356568 (diff)
downloadqpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java25
2 files changed, 22 insertions, 9 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
index 35d262cdb3..df4d7c7721 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -34,4 +34,10 @@ public interface ServerProtocolEngine extends ProtocolEngine
boolean isTransportBlockedForWriting();
void setTransportBlockedForWriting(boolean blocked);
+
+ void setMessageAssignmentSuspended(boolean value);
+
+ boolean isMessageAssignmentSuspended();
+
+ void processPendingMessages();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 6599f4443c..c17ee500b0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -61,7 +61,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final ServerProtocolEngine _receiver;
+ private final ServerProtocolEngine _protocolEngine;
private final int _receiveBufSize;
private final Ticker _ticker;
private final Set<TransportEncryption> _encryptionSet;
@@ -81,7 +81,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
public NonBlockingSenderReceiver(final NonBlockingConnection connection,
- ServerProtocolEngine receiver,
+ ServerProtocolEngine protocolEngine,
int receiveBufSize,
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
@@ -94,7 +94,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
{
_connection = connection;
_socketChannel = connection.getSocketChannel();
- _receiver = receiver;
+ _protocolEngine = protocolEngine;
_receiveBufSize = receiveBufSize;
_ticker = ticker;
_encryptionSet = encryptionSet;
@@ -170,15 +170,22 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
_ticker.tick(currentTime);
}
- _receiver.setTransportBlockedForWriting(!doWrite());
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ _protocolEngine.processPendingMessages();
+
+ _protocolEngine.setTransportBlockedForWriting(!doWrite());
boolean dataRead = doRead();
_fullyWritten = doWrite();
- _receiver.setTransportBlockedForWriting(!_fullyWritten);
+ _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
{
_stateChanged.set(true);
}
+
+ // tell all consumer targets that it is okay to accept more
+ _protocolEngine.setMessageAssignmentSuspended(false);
}
catch (IOException e)
{
@@ -213,7 +220,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
}
LOGGER.debug("Closing receiver");
- _receiver.closed();
+ _protocolEngine.closed();
try
{
@@ -373,7 +380,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
ByteBuffer dup = _currentBuffer.duplicate();
dup.flip();
_currentBuffer = _currentBuffer.slice();
- _receiver.received(dup);
+ _protocolEngine.received(dup);
}
}
else if(_transportEncryption == TransportEncryption.TLS)
@@ -414,7 +421,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
{
readData = true;
}
- _receiver.received(appInputBuffer);
+ _protocolEngine.received(appInputBuffer);
}
while(unwrapped > 0 || tasksRun);
@@ -451,7 +458,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
if (_transportEncryption == TransportEncryption.NONE)
{
- _receiver.received(_netInputBuffer);
+ _protocolEngine.received(_netInputBuffer);
}
else
{