diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 226 |
1 files changed, 50 insertions, 176 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 5e95701e5a..68f1ad7942 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; -import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogActor; @@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -86,7 +83,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -103,9 +100,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private VirtualHost _virtualHost; - private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = + new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + @SuppressWarnings("unchecked") + private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; /** * The channels that the latest call to {@link #received(ByteBuffer)} applied to. @@ -114,9 +113,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>(); - - private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); + private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = + new HashSet<AMQChannel<AMQProtocolEngine>>(); private final AMQStateManager _stateManager; @@ -124,10 +122,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private SaslServer _saslServer; - private Object _lastReceived; - - private Object _lastSent; - private volatile boolean _closed; // maximum number of channels this session should have @@ -136,8 +130,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); - private FieldTable _clientProperties; - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<? super AMQProtocolEngine>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>(); private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; @@ -153,12 +147,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastIoTime; private long _writtenBytes; - private long _readBytes; - private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); - private long _createTime = System.currentTimeMillis(); private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -176,6 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; + private long _readBytes; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -258,15 +250,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + _readBytes += msg.remaining(); _receivedLock.lock(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - final int len = dataBlocks.size(); - for (int i = 0; i < len; i++) + for (AMQDataBlock dataBlock : dataBlocks) { - AMQDataBlock dataBlock = dataBlocks.get(i); try { dataBlockReceived(dataBlock); @@ -316,7 +307,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void receivedComplete() { - for (AMQChannel channel : _channelsForCurrentMessage) + for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage) { channel.receivedComplete(); } @@ -334,7 +325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ private void dataBlockReceived(AMQDataBlock message) throws Exception { - _lastReceived = message; if (message instanceof ProtocolInitiation) { protocolInitiationReceived((ProtocolInitiation) message); @@ -363,7 +353,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); - AMQChannel amqChannel = _channelMap.get(channelId); + AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId); if(amqChannel != null) { // The _receivedLock is already acquired in the caller @@ -558,14 +548,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt); @@ -611,11 +593,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (Exception e) { - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - _logger.error("Unexpected exception while processing frame. Closing connection.", e); closeProtocolSession(); @@ -625,7 +602,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); channel.publishContentHeader(body); @@ -633,7 +610,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId); channel.publishContentBody(body); } @@ -681,17 +658,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _contextKey = contextKey; } - public List<AMQChannel> getChannels() + public List<AMQChannel<AMQProtocolEngine>> getChannels() { synchronized (_channelMap) { - return new ArrayList<AMQChannel>(_channelMap.values()); + return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values()); } } - public AMQChannel getAndAssertChannel(int channelId) throws AMQException + public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException { - AMQChannel channel = getChannel(channelId); + AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); if (channel == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); @@ -700,9 +677,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return channel; } - public AMQChannel getChannel(int channelId) + public AMQChannel<AMQProtocolEngine> getChannel(int channelId) { - final AMQChannel channel = + final AMQChannel<AMQProtocolEngine> channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { @@ -719,7 +696,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel channel) throws AMQException + public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if (_closed) { @@ -770,7 +747,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - public void commitTransactions(AMQChannel channel) throws AMQException + public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -778,7 +755,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void rollbackTransactions(AMQChannel channel) throws AMQException + public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException { if ((channel != null) && channel.isTransactional()) { @@ -802,7 +779,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeChannel(int channelId, AMQConstant cause, String message) { - final AMQChannel channel = getChannel(channelId); + final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); @@ -879,12 +856,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /** * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - * - * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() { - for (AMQChannel channel : getChannels()) + for (AMQChannel<AMQProtocolEngine> channel : getChannels()) { channel.close(); } @@ -929,9 +904,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeAllChannels(); - for (Task task : _taskList) + for (Action<? super AMQProtocolEngine> task : _taskList) { - task.doTask(this); + task.performAction(this); } synchronized(this) @@ -961,7 +936,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (InterruptedException e) { - + // do nothing } finally { @@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } - public String dump() - { - return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; - } - /** @return an object that can be used to identity */ public Object getKey() { @@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setClientProperties(FieldTable clientProperties) { - _clientProperties = clientProperties; - if (_clientProperties != null) + if (clientProperties != null) { - String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); if (closeWhenNoRoute != null) { _closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute); @@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8); - _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT); + _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); + _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); - String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); + String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8); if (clientId != null) { setContextKey(new AMQShortString(clientId)); @@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _protocolVersion.getMinorVersion(); } - public boolean isProtocolVersion(byte major, byte minor) - { - return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); - } - public MethodRegistry getRegistry() { return getMethodRegistry(); @@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } - public void addSessionCloseTask(Task task) + public void addDeleteTask(Action<? super AMQProtocolEngine> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { _taskList.remove(task); } @@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _clientProduct; } - public String getPrincipalAsString() - { - return getAuthId(); - } - public long getSessionCountLimit() { return getMaximumNumberOfChannels(); } - public Boolean isIncoming() - { - return true; - } - - public Boolean isSystemConnection() - { - return false; - } - - public Boolean isFederationLink() - { - return false; - } - - public String getAuthId() - { - return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); - } - - public Integer getRemotePID() - { - return null; - } - - public String getRemoteProcessName() - { - return null; - } - - public Integer getRemoteParentPID() - { - return null; - } - public boolean isDurable() { return false; @@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public long getCreateTime() - { - return _createTime; - } - - public Boolean isShadow() - { - return false; - } - - public void mgmtClose() - { - MethodRegistry methodRegistry = getMethodRegistry(); - ConnectionCloseBody responseBody = - methodRegistry.createConnectionCloseBody( - AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("The connection was closed using the broker's management interface."), - 0,0); - - // This seems ugly but because we use closeConnection in both normal - // broker operation and as part of the management interface it cannot - // be avoided. The Current Actor will be null when this method is - // called via the QMF management interface. As such we need to set one. - boolean removeActor = false; - if (CurrentActor.get() == null) - { - removeActor = true; - CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); - } - - try - { - writeFrame(responseBody.generateFrame(0)); - - closeSession(); - - } - finally - { - if (removeActor) - { - CurrentActor.remove(); - } - } - } - public void mgmtCloseChannel(int channelId) { MethodRegistry methodRegistry = getMethodRegistry(); @@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public String getClientID() - { - return getContextKey().toString(); - } - - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) { - int channelId = ((AMQChannel)session).getChannelId(); + int channelId = session.getChannelId(); closeChannel(channelId, cause, message); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null)); + null)); } public void block() @@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(!_blocking) { _blocking = true; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) { channel.block(); } @@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if(_blocking) { _blocking = false; - for(AMQChannel channel : _channelMap.values()) + for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values()) { channel.unblock(); } @@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closed; } - public List<AMQSessionModel> getSessionModels() + public List<AMQChannel<AMQProtocolEngine>> getSessionModels() { - return new ArrayList<AMQSessionModel>(getChannels()); + return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels()); } public LogSubject getLogSubject() @@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } - public void setDeferFlush(boolean deferFlush) + @Override + public String getRemoteContainerName() { - _deferFlush = deferFlush; + return String.valueOf(getContextKey()); } - public String getUserName() + public void setDeferFlush(boolean deferFlush) { - return getAuthorizedPrincipal().getName(); + _deferFlush = deferFlush; } public final class WriteDeliverMethod |