diff options
author | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
commit | f5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch) | |
tree | 25544486642cc770061489663dba650d85769404 /qpid/java/common | |
parent | 085486ebe5ff21133b9caf1c31625ac6ea356568 (diff) | |
download | qpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz |
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
2 files changed, 22 insertions, 9 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java index 35d262cdb3..df4d7c7721 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java @@ -34,4 +34,10 @@ public interface ServerProtocolEngine extends ProtocolEngine boolean isTransportBlockedForWriting(); void setTransportBlockedForWriting(boolean blocked); + + void setMessageAssignmentSuspended(boolean value); + + boolean isMessageAssignmentSuspended(); + + void processPendingMessages(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 6599f4443c..c17ee500b0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -61,7 +61,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); - private final ServerProtocolEngine _receiver; + private final ServerProtocolEngine _protocolEngine; private final int _receiveBufSize; private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; @@ -81,7 +81,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender public NonBlockingSenderReceiver(final NonBlockingConnection connection, - ServerProtocolEngine receiver, + ServerProtocolEngine protocolEngine, int receiveBufSize, Ticker ticker, final Set<TransportEncryption> encryptionSet, @@ -94,7 +94,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender { _connection = connection; _socketChannel = connection.getSocketChannel(); - _receiver = receiver; + _protocolEngine = protocolEngine; _receiveBufSize = receiveBufSize; _ticker = ticker; _encryptionSet = encryptionSet; @@ -170,15 +170,22 @@ public class NonBlockingSenderReceiver implements ByteBufferSender _ticker.tick(currentTime); } - _receiver.setTransportBlockedForWriting(!doWrite()); + _protocolEngine.setMessageAssignmentSuspended(true); + + _protocolEngine.processPendingMessages(); + + _protocolEngine.setTransportBlockedForWriting(!doWrite()); boolean dataRead = doRead(); _fullyWritten = doWrite(); - _receiver.setTransportBlockedForWriting(!_fullyWritten); + _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) { _stateChanged.set(true); } + + // tell all consumer targets that it is okay to accept more + _protocolEngine.setMessageAssignmentSuspended(false); } catch (IOException e) { @@ -213,7 +220,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender } LOGGER.debug("Closing receiver"); - _receiver.closed(); + _protocolEngine.closed(); try { @@ -373,7 +380,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender ByteBuffer dup = _currentBuffer.duplicate(); dup.flip(); _currentBuffer = _currentBuffer.slice(); - _receiver.received(dup); + _protocolEngine.received(dup); } } else if(_transportEncryption == TransportEncryption.TLS) @@ -414,7 +421,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender { readData = true; } - _receiver.received(appInputBuffer); + _protocolEngine.received(appInputBuffer); } while(unwrapped > 0 || tasksRun); @@ -451,7 +458,7 @@ public class NonBlockingSenderReceiver implements ByteBufferSender if (_transportEncryption == TransportEncryption.NONE) { - _receiver.received(_netInputBuffer); + _protocolEngine.received(_netInputBuffer); } else { |