diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 287 |
1 files changed, 174 insertions, 113 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index cb145aac88..d7b5b00b26 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -36,13 +36,14 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -58,7 +59,7 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; @@ -69,6 +70,7 @@ import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; @@ -85,7 +87,7 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -96,6 +98,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, ServerMethodProcessor<ServerChannelMethodProcessor> { + + + enum ConnectionState { INIT, @@ -117,6 +122,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private static final long AWAIT_CLOSED_TIMEOUT = 60000; private final AmqpPort<?> _port; private final long _creationTime; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); private AMQShortString _contextKey; @@ -139,11 +146,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, * The channels that the latest call to {@link #received(ByteBuffer)} applied to. * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} * on after handling the frames. - * - * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = - new HashSet<>(); + private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>(); private AMQDecoder _decoder; @@ -157,9 +161,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final List<Action<? super AMQProtocolEngine>> _taskList = + private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList = new CopyOnWriteArrayList<>(); + private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); @@ -179,13 +186,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private volatile boolean _deferFlush; - private long _lastReceivedTime; + private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? private boolean _blocking; - private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); private final Broker<?> _broker; private final Transport _transport; @@ -200,6 +206,34 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private int _currentMethodId; private int _binaryDataLimit; private long _maxMessageSize; + private volatile boolean _transportBlockedForWriting; + + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + + + @Override + public boolean isMessageAssignmentSuspended() + { + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + if(!messageAssignmentSuspended) + { + for(AMQSessionModel<?,?> session : getSessionModels()) + { + for(Consumer<?> consumer : session.getConsumers()) + { + ((ConsumerImpl)consumer).getTarget().notifyCurrentState(); + } + } + } + } + public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, @@ -211,7 +245,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _port = port; _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); - _receivedLock = new ReentrantLock(); _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -262,12 +295,28 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _authorizedSubject; } + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + for(AMQChannel channel : _channelMap.values()) + { + channel.transportStateChanged(); + } + } + public void setNetworkConnection(NetworkConnection network) { setNetworkConnection(network, network.getSender()); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; _sender = sender; @@ -294,10 +343,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } - public synchronized void flushBatched() - { - _sender.flush(); - } public ClientDeliveryMethod createDeliveryMethod(int channelId) @@ -314,9 +359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { final long arrivalTime = System.currentTimeMillis(); - if(!_authenticated && - (arrivalTime - _creationTime) > _port.getContextValue(Long.class, - Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + if (!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) { _logger.warn("Connection has taken more than " + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) @@ -328,7 +373,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _lastIoTime = arrivalTime; _readBytes += msg.remaining(); - _receivedLock.lock(); try { _decoder.decodeBuffer(msg); @@ -371,7 +415,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } catch (StoreException e) { - if(_virtualHost.getState() == State.ACTIVE) + if (_virtualHost.getState() == State.ACTIVE) { throw e; } @@ -380,10 +424,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _logger.error("Store Exception ignored as virtual host no longer active", e); } } - finally - { - _receivedLock.unlock(); - } return null; } }); @@ -484,64 +524,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(responseBody.generateFrame(0)); _state = ConnectionState.AWAIT_START_OK; + _sender.flush(); + } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _sender.flush(); } } private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final int size = (int) block.getSize(); - - final byte[] data; - - - if(size > REUSABLE_BYTE_BUFFER_CAPACITY) - { - data= new byte[size]; - } - else - { - - data = _reusableBytes; - } - _reusableDataOutput.setBuffer(data); - - try - { - block.writePayload(_reusableDataOutput); - } - catch (IOException e) - { - throw new ServerScopedRuntimeException(e); - } - - final ByteBuffer buf; - - if(size <= REUSABLE_BYTE_BUFFER_CAPACITY) - { - buf = _reusableByteBuffer; - buf.position(0); - } - else - { - buf = ByteBuffer.wrap(data); - } - buf.limit(_reusableDataOutput.length()); - - return buf; - } - - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -550,16 +548,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, */ public synchronized void writeFrame(AMQDataBlock frame) { - - final ByteBuffer buf = asByteBuffer(frame); - _writtenBytes += buf.remaining(); - if(_logger.isDebugEnabled()) { _logger.debug("SEND: " + frame); } - _sender.send(buf); + try + { + _writtenBytes += frame.writePayload(_sender); + } + catch (IOException e) + { + throw new ServerScopedRuntimeException(e); + } + + final long time = System.currentTimeMillis(); _lastIoTime = time; _lastWriteTime.set(time); @@ -796,14 +799,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, if(_closing.compareAndSet(false,true)) { // force sync of outstanding async work - _receivedLock.lock(); try { receivedComplete(); } finally { - _receivedLock.unlock(); + finishClose(connectionDropped); } @@ -845,7 +847,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - for (Action<? super AMQProtocolEngine> task : _taskList) + for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList) { task.performAction(this); } @@ -867,17 +869,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { synchronized(this) { - final boolean lockHeld = _receivedLock.isHeldByCurrentThread(); final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT; while(!_closed && endTime > System.currentTimeMillis()) { try { - if(lockHeld) - { - _receivedLock.unlock(); - } wait(1000); } catch (InterruptedException e) @@ -885,13 +882,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Thread.currentThread().interrupt(); break; } - finally - { - if(lockHeld) - { - _receivedLock.lock(); - } - } } if (!_closed) @@ -1088,12 +1078,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void addDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); } public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public ProtocolOutputConverter getProtocolOutputConverter() @@ -1171,6 +1161,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() @@ -1323,26 +1318,50 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return String.valueOf(getRemoteAddress()); } - public void closeSession(AMQChannel session, AMQConstant cause, String message) + public void closeSessionAsync(final AMQChannel session, final AMQConstant cause, final String message) { - int channelId = session.getChannelId(); - closeChannel(channelId, cause, message); + addAsyncTask(new Action<AMQProtocolEngine>() + { - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - AMQShortString.validValueOf(message), - 0, 0); + @Override + public void performAction(final AMQProtocolEngine object) + { + int channelId = session.getChannelId(); + closeChannel(channelId, cause, message); + + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + AMQShortString.validValueOf(message), + 0, 0); + + writeFrame(responseBody.generateFrame(channelId)); + } + }); - writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getMethodRegistry(), - null)); + Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>() + { + @Override + public void performAction(final AMQProtocolEngine object) + { + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getMethodRegistry(), + null)); + + } + }; + addAsyncTask(action); + } + + private void addAsyncTask(final Action<AMQProtocolEngine> action) + { + _asyncTaskList.add(action); + notifyWork(); } public void block() @@ -1922,11 +1941,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _reference; } - public Lock getReceivedLock() - { - return _receivedLock; - } - @Override public long getLastReadTime() { @@ -2045,4 +2059,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } + @Override + public void processPending() + { + + + while(_asyncTaskList.peek() != null) + { + Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } + } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + final Action<ServerProtocolEngine> listener = _workListener.get(); + if(listener != null) + { + + listener.performAction(this); + } + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + } } |