summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java365
1 files changed, 117 insertions, 248 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index bff0a79de1..a1ffe272fd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.server.protocol;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -32,16 +30,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
-import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -66,10 +66,12 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -88,22 +90,21 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkConnection;
-public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -133,7 +134,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Object _lastSent;
protected volatile boolean _closed;
-
+
// maximum number of channels this session should have
private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -145,46 +146,47 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
- private Subject _authorizedSubject;
+ private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
- private final long _sessionID;
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
+ private NetworkDriver _networkDriver;
+
private long _lastIoTime;
private long _writtenBytes;
private long _readBytes;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
- private ApplicationRegistry _registry;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
-
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _codecFactory = new AMQCodecFactory(true, this);
+ _networkDriver = driver;
- setNetworkConnection(network);
- _sessionID = connectionId;
+ _codecFactory = new AMQCodecFactory(true, this);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -193,21 +195,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_configStore = virtualHostRegistry.getConfigStore();
_id = _configStore.createId();
- _actor.message(ConnectionMessages.OPEN(null, null, false, false));
- _registry = virtualHostRegistry.getApplicationRegistry();
- initialiseStatistics();
- }
-
- public void setNetworkConnection(NetworkConnection network)
- {
- setNetworkConnection(network, network.getSender());
- }
+ _actor.message(ConnectionMessages.OPEN(null, null, false, false));
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- _network = network;
- _sender = sender;
}
private AMQProtocolSessionMBean createMBean() throws JMException
@@ -246,18 +236,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch (Exception e)
+ public void run()
{
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
}
- }
+ });
}
catch (Exception e)
{
@@ -335,11 +333,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeChannel(channelId);
throw e;
}
- catch (TransportException e)
- {
- closeChannel(channelId);
- throw e;
- }
}
finally
{
@@ -350,7 +343,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
// Log incomming protocol negotiation request
@@ -370,49 +363,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
null,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(asByteBuffer(responseBody.generateFrame(0)));
- _sender.flush();
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
- _sender.flush();
- }
- }
-
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
-
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
-
-
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
-
- buf.flip();
- return buf;
}
public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
@@ -467,19 +426,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
AMQConstant.CHANNEL_ERROR.getName().toString());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
+ closeConnection(channelId, ce, false);
}
}
catch (AMQConnectionException e)
{
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e);
+ closeConnection(channelId, e, false);
}
catch (AMQSecurityException e)
{
AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
+ closeConnection(channelId, ce, false);
}
}
catch (Exception e)
@@ -522,14 +481,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*
* @param frame the frame to write
*/
- public synchronized void writeFrame(AMQDataBlock frame)
+ public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
- final ByteBuffer buf = asByteBuffer(frame);
+ final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- _sender.send(buf);
- _sender.flush();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
}
public AMQShortString getContextKey()
@@ -719,8 +683,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
if (delay > 0)
{
- _network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -761,7 +725,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
closeAllChannels();
-
+
getConfigStore().removeConfiguredObject(this);
if (_managedObject != null)
@@ -781,6 +745,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_closed = true;
notifyAll();
}
+ _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
}
@@ -803,32 +768,27 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
- public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
{
- try
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e);
- }
-
- markChannelAwaitingCloseOk(channelId);
- closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ _logger.info("Closing connection due to: " + e);
}
- finally
+
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+
+ if (closeProtocolSession)
{
closeProtocolSession();
}
-
-
}
public void closeProtocolSession()
{
- _network.close();
-
+ _networkDriver.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -837,15 +797,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.info(e.getMessage());
}
- catch (TransportException e)
- {
- _logger.info(e.getMessage());
- }
}
public String toString()
{
- return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -867,11 +823,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
public String getLocalFQDN()
{
- SocketAddress address = _network.getLocalAddress();
+ SocketAddress address = _networkDriver.getLocalAddress();
+ // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
+ // information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
return ((InetSocketAddress) address).getHostName();
}
+ else if (address instanceof VmPipeAddress)
+ {
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
+ }
else
{
throw new IllegalArgumentException("Unsupported socket address class: " + address);
@@ -950,7 +912,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public Object getClientIdentifier()
{
- return _network.getRemoteAddress();
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
@@ -963,7 +925,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
-
+
_configStore.addConfiguredObject(this);
try
@@ -992,33 +954,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return _protocolOutputConverter;
}
- public void setAuthorizedSubject(final Subject authorizedSubject)
+ public void setAuthorizedID(Principal authorizedID)
{
- if (authorizedSubject == null)
- {
- throw new IllegalArgumentException("authorizedSubject cannot be null");
- }
- _authorizedSubject = authorizedSubject;
+ _authorizedID = authorizedID;
}
- public Subject getAuthorizedSubject()
+ public Principal getAuthorizedID()
{
- return _authorizedSubject;
+ return _authorizedID;
}
- public Principal getAuthorizedPrincipal()
+ public Principal getPrincipal()
{
- return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+ return _authorizedID;
}
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -1041,10 +999,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_logger.error("Could not close protocol engine", e);
}
- catch (TransportException e)
- {
- _logger.error("Could not close protocol engine", e);
- }
}
public void readerIdle()
@@ -1052,9 +1006,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
// Nothing
}
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
public void writerIdle()
{
- _sender.send(asByteBuffer(HeartbeatBody.FRAME));
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
}
public void exception(Throwable throwable)
@@ -1062,7 +1021,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
if (throwable instanceof AMQProtocolHeaderException)
{
writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
- _sender.close();
+ _networkDriver.close();
_logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
@@ -1080,7 +1039,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
writeFrame(closeBody.generateFrame(0));
- _sender.close();
+ _networkDriver.close();
}
}
@@ -1119,6 +1078,19 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return (_clientVersion == null) ? null : _clientVersion.toString();
}
+ public void closeIfLingeringClosedChannels()
+ {
+ for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+ {
+ if (id.getValue() + 30000 > System.currentTimeMillis())
+ {
+ // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+ _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+ closeProtocolSession();
+ }
+ }
+ }
+
public Boolean isIncoming()
{
return true;
@@ -1136,7 +1108,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public String getAuthId()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedID().getName();
}
public Integer getRemotePID()
@@ -1198,7 +1170,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
return false;
}
-
+
public void mgmtClose()
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1291,6 +1263,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
+
closeChannel((Integer)session.getID());
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1300,110 +1273,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
new AMQShortString(message),
0,0);
- writeFrame(responseBody.generateFrame((Integer)session.getID()));
- }
-
- public void close(AMQConstant cause, String message) throws AMQException
- {
- closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getProtocolOutputConverter().getProtocolMajorVersion(),
- getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null));
- }
-
- public List<AMQSessionModel> getSessionModels()
- {
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (AMQChannel channel : getChannels())
- {
- sessions.add((AMQSessionModel) channel);
- }
- return sessions;
- }
-
- public LogSubject getLogSubject()
- {
- return _logSubject;
- }
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- _virtualHost.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- _virtualHost.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
- _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
- _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
- @Override
- public boolean isSessionNameUnique(String name)
- {
- return true;
- }
-
- @Override
- public String getUserName()
- {
- return getAuthorizedPrincipal().getName();
- }
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
}