diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java')
-rwxr-xr-x | qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java | 126 |
1 files changed, 103 insertions, 23 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 854cd388b9..e391bd6771 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -24,18 +24,23 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; @@ -52,13 +57,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private ServerConnection _connection; private long _createTime = System.currentTimeMillis(); - private long _lastReadTime; - private long _lastWriteTime; + private long _lastReadTime = _createTime; + private long _lastWriteTime = _createTime; + private volatile boolean _transportBlockedForWriting; + + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) { - super(new Assembler(conn)); + super(new ServerAssembler(conn)); _connection = conn; if(network != null) @@ -67,7 +79,33 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + + if(!messageAssignmentSuspended) + { + for(AMQSessionModel<?,?> session : _connection.getSessionModels()) + { + for(Consumer<?> consumer : session.getConsumers()) + { + ((ConsumerImpl)consumer).getTarget().notifyCurrentState(); + } + } + } + } + + + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) { @@ -87,7 +125,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); _connection.setSender(disassembler); _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions @@ -96,23 +134,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) + private ByteBufferSender wrapSender(final ByteBufferSender sender) { - return new Sender<ByteBuffer>() + return new ByteBufferSender() { @Override - public void setIdleTimeout(int i) - { - sender.setIdleTimeout(i); - - } - - @Override public void send(ByteBuffer msg) { _lastWriteTime = System.currentTimeMillis(); sender.send(msg); - } @Override @@ -190,6 +220,11 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return _writtenBytes; } + @Override + public void encryptedTransport() + { + } + public void writerIdle() { _connection.doHeartBeat(); @@ -215,11 +250,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return getRemoteAddress().toString(); } - public String getAuthId() - { - return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); - } - public boolean isDurable() { return false; @@ -246,4 +276,54 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getAuthorizedSubject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + } + + @Override + public void processPending() + { + _connection.processPending(); + + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + listener.performAction(this); + } + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } } |