diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java | 365 |
1 files changed, 117 insertions, 248 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index bff0a79de1..a1ffe272fd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.server.protocol; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -32,16 +30,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; -import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; +import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -66,10 +66,12 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -88,22 +90,21 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.NetworkConnection; -public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig +public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private static final AtomicLong idGenerator = new AtomicLong(0); + // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; @@ -133,7 +134,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private Object _lastSent; protected volatile boolean _closed; - + // maximum number of channels this session should have private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); @@ -145,46 +146,47 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; - private Subject _authorizedSubject; + private Principal _authorizedID; private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; - private final long _sessionID; + // Create a simple ID that increments for ever new Session + private final long _sessionID = idGenerator.getAndIncrement(); private AMQPConnectionActor _actor; private LogSubject _logSubject; + private NetworkDriver _networkDriver; + private long _lastIoTime; private long _writtenBytes; private long _readBytes; + private Job _readJob; + private Job _writeJob; + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); private final UUID _id; private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); - private ApplicationRegistry _registry; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; - public ManagedObject getManagedObject() { return _managedObject; } - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) { _stateManager = new AMQStateManager(virtualHostRegistry, this); - _codecFactory = new AMQCodecFactory(true, this); + _networkDriver = driver; - setNetworkConnection(network); - _sessionID = connectionId; + _codecFactory = new AMQCodecFactory(true, this); + _poolReference.acquireExecutorService(); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -193,21 +195,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _configStore = virtualHostRegistry.getConfigStore(); _id = _configStore.createId(); - _actor.message(ConnectionMessages.OPEN(null, null, false, false)); - _registry = virtualHostRegistry.getApplicationRegistry(); - initialiseStatistics(); - } - - public void setNetworkConnection(NetworkConnection network) - { - setNetworkConnection(network, network.getSender()); - } + _actor.message(ConnectionMessages.OPEN(null, null, false, false)); - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - _network = network; - _sender = sender; } private AMQProtocolSessionMBean createMBean() throws JMException @@ -246,18 +236,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { - try - { - dataBlockReceived(dataBlock); - } - catch (Exception e) + public void run() { - _logger.error("Unexpected exception when processing datablock", e); - closeProtocolSession(); + // Decode buffer + + for (AMQDataBlock dataBlock : dataBlocks) + { + try + { + dataBlockReceived(dataBlock); + } + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); + } + } } - } + }); } catch (Exception e) { @@ -335,11 +333,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeChannel(channelId); throw e; } - catch (TransportException e) - { - closeChannel(channelId); - throw e; - } } finally { @@ -350,7 +343,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { // Log incomming protocol negotiation request @@ -370,49 +363,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr null, mechanisms.getBytes(), locales.getBytes()); - _sender.send(asByteBuffer(responseBody.generateFrame(0))); - _sender.flush(); + _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()))); - _sender.flush(); - } - } - - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); - - try - { - block.writePayload(new DataOutputStream(new OutputStream() - { - - - @Override - public void write(int b) throws IOException - { - buf.put((byte) b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException - { - buf.put(b, off, len); - } - })); - } - catch (IOException e) - { - throw new RuntimeException(e); + _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); } - - buf.flip(); - return buf; } public void methodFrameReceived(int channelId, AMQMethodBody methodBody) @@ -467,19 +426,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr AMQConstant.CHANNEL_ERROR.getName().toString()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce); + closeConnection(channelId, ce, false); } } catch (AMQConnectionException e) { _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, e); + closeConnection(channelId, e, false); } catch (AMQSecurityException e) { AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); - closeConnection(channelId, ce); + closeConnection(channelId, ce, false); } } catch (Exception e) @@ -522,14 +481,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr * * @param frame the frame to write */ - public synchronized void writeFrame(AMQDataBlock frame) + public void writeFrame(AMQDataBlock frame) { _lastSent = frame; - final ByteBuffer buf = asByteBuffer(frame); + final ByteBuffer buf = frame.toNioByteBuffer(); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - _sender.send(buf); - _sender.flush(); + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() + { + public void run() + { + _networkDriver.send(buf); + } + }); } public AMQShortString getContextKey() @@ -719,8 +683,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { if (delay > 0) { - _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _networkDriver.setMaxWriteIdle(delay); + _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -761,7 +725,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } closeAllChannels(); - + getConfigStore().removeConfiguredObject(this); if (_managedObject != null) @@ -781,6 +745,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _closed = true; notifyAll(); } + _poolReference.releaseExecutorService(); CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE()); } } @@ -803,32 +768,27 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } - public void closeConnection(int channelId, AMQConnectionException e) throws AMQException + public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException { - try + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + _logger.info("Closing connection due to: " + e); } - finally + + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + + if (closeProtocolSession) { closeProtocolSession(); } - - } public void closeProtocolSession() { - _network.close(); - + _networkDriver.close(); try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -837,15 +797,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.info(e.getMessage()); } - catch (TransportException e) - { - _logger.info(e.getMessage()); - } } public String toString() { - return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); + return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); } public String dump() @@ -867,11 +823,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ public String getLocalFQDN() { - SocketAddress address = _network.getLocalAddress(); + SocketAddress address = _networkDriver.getLocalAddress(); + // we use the vmpipe address in some tests hence the need for this rather ugly test. The host + // information is used by SASL primary. if (address instanceof InetSocketAddress) { return ((InetSocketAddress) address).getHostName(); } + else if (address instanceof VmPipeAddress) + { + return "vmpipe:" + ((VmPipeAddress) address).getPort(); + } else { throw new IllegalArgumentException("Unsupported socket address class: " + address); @@ -950,7 +912,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public Object getClientIdentifier() { - return _network.getRemoteAddress(); + return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; } public VirtualHost getVirtualHost() @@ -963,7 +925,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); - + _configStore.addConfiguredObject(this); try @@ -992,33 +954,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _protocolOutputConverter; } - public void setAuthorizedSubject(final Subject authorizedSubject) + public void setAuthorizedID(Principal authorizedID) { - if (authorizedSubject == null) - { - throw new IllegalArgumentException("authorizedSubject cannot be null"); - } - _authorizedSubject = authorizedSubject; + _authorizedID = authorizedID; } - public Subject getAuthorizedSubject() + public Principal getAuthorizedID() { - return _authorizedSubject; + return _authorizedID; } - public Principal getAuthorizedPrincipal() + public Principal getPrincipal() { - return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); + return _authorizedID; } public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -1041,10 +999,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _logger.error("Could not close protocol engine", e); } - catch (TransportException e) - { - _logger.error("Could not close protocol engine", e); - } } public void readerIdle() @@ -1052,9 +1006,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr // Nothing } + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + public void writerIdle() { - _sender.send(asByteBuffer(HeartbeatBody.FRAME)); + _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); } public void exception(Throwable throwable) @@ -1062,7 +1021,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr if (throwable instanceof AMQProtocolHeaderException) { writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _sender.close(); + _networkDriver.close(); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1080,7 +1039,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr writeFrame(closeBody.generateFrame(0)); - _sender.close(); + _networkDriver.close(); } } @@ -1119,6 +1078,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return (_clientVersion == null) ? null : _clientVersion.toString(); } + public void closeIfLingeringClosedChannels() + { + for (Entry<Integer, Long>id : _closingChannelsList.entrySet()) + { + if (id.getValue() + 30000 > System.currentTimeMillis()) + { + // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection + _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); + closeProtocolSession(); + } + } + } + public Boolean isIncoming() { return true; @@ -1136,7 +1108,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public String getAuthId() { - return getAuthorizedPrincipal().getName(); + return getAuthorizedID().getName(); } public Integer getRemotePID() @@ -1198,7 +1170,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { return false; } - + public void mgmtClose() { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1291,6 +1263,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { + closeChannel((Integer)session.getID()); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1300,110 +1273,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr new AMQShortString(message), 0,0); - writeFrame(responseBody.generateFrame((Integer)session.getID())); - } - - public void close(AMQConstant cause, String message) throws AMQException - { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getProtocolOutputConverter().getProtocolMajorVersion(), - getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null)); - } - - public List<AMQSessionModel> getSessionModels() - { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); - for (AMQChannel channel : getChannels()) - { - sessions.add((AMQSessionModel) channel); - } - return sessions; - } - - public LogSubject getLogSubject() - { - return _logSubject; - } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - _virtualHost.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - _virtualHost.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); - _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); - _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } - - @Override - public boolean isSessionNameUnique(String name) - { - return true; - } - - @Override - public String getUserName() - { - return getAuthorizedPrincipal().getName(); - } + writeFrame(responseBody.generateFrame((Integer)session.getID())); + } } |