diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java | 246 |
1 files changed, 137 insertions, 109 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index b750b29952..1e649c3cb7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,8 +20,26 @@ */ package org.apache.qpid.server.protocol; -import org.apache.log4j.Logger; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.security.auth.Subject; +import javax.security.sasl.SaslServer; +import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -46,13 +64,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; @@ -66,25 +81,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -import javax.management.JMException; -import javax.security.auth.Subject; -import javax.security.sasl.SaslServer; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicBoolean; - -public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession, ConnectionConfig { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -109,8 +106,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private AMQCodecFactory _codecFactory; - private AMQProtocolSessionMBean _managedObject; - private SaslServer _saslServer; private Object _lastReceived; @@ -147,12 +142,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); - private final UUID _id; + private final UUID _qmfId; private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); - private ApplicationRegistry _registry; - private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private NetworkConnection _network; @@ -160,14 +153,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private volatile boolean _deferFlush; private long _lastReceivedTime; + private boolean _blocking; - public ManagedObject getManagedObject() - { - return _managedObject; - } + private final Lock _receivedLock; public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) { + _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(virtualHostRegistry, this); _codecFactory = new AMQCodecFactory(true, this); @@ -179,12 +171,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _logSubject = new ConnectionLogSubject(this); _configStore = virtualHostRegistry.getConfigStore(); - _id = _configStore.createId(); + _qmfId = _configStore.createId(); _actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false)); - _registry = virtualHostRegistry.getApplicationRegistry(); initialiseStatistics(); + } public void setNetworkConnection(NetworkConnection network) @@ -198,11 +190,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _sender = sender; } - private AMQProtocolSessionMBean createMBean() throws JMException - { - return new AMQProtocolSessionMBean(this); - } - public long getSessionID() { return _connectionID; @@ -244,6 +231,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + + _receivedLock.lock(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -268,6 +257,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _logger.error("Unexpected exception when processing datablock", e); closeProtocolSession(); } + finally + { + _receivedLock.unlock(); + } } private void receiveComplete() @@ -374,7 +367,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr // This sets the protocol version (and hence framing classes) for this session. setProtocolVersion(pv); - String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager(getLocalAddress()).getMechanisms(); String locales = "en_US"; @@ -576,7 +569,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public List<AMQChannel> getChannels() { - return new ArrayList<AMQChannel>(_channelMap.values()); + synchronized (_channelMap) + { + return new ArrayList<AMQChannel>(_channelMap.values()); + } } public AMQChannel getAndAssertChannel(int channelId) throws AMQException @@ -633,24 +629,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } else { - _channelMap.put(channel.getChannelId(), channel); + synchronized (_channelMap) + { + _channelMap.put(channel.getChannelId(), channel); + + if(_blocking) + { + channel.block(); + } + } } if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) { _cachedChannels[channelId] = channel; } - - checkForNotification(); - } - - private void checkForNotification() - { - int channelsCount = _channelMap.size(); - if (_managedObject != null && channelsCount >= _maxNoOfChannels) - { - _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); - } } public Long getMaximumNumberOfChannels() @@ -735,10 +728,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ public void removeChannel(int channelId) { - _channelMap.remove(channelId); - if ((channelId & CHANNEL_CACHE_SIZE) == channelId) + synchronized (_channelMap) { - _cachedChannels[channelId] = null; + _channelMap.remove(channelId); + + if ((channelId & CHANNEL_CACHE_SIZE) == channelId) + { + _cachedChannels[channelId] = null; + } } } @@ -763,12 +760,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ private void closeAllChannels() throws AMQException { - for (AMQChannel channel : _channelMap.values()) + for (AMQChannel channel : getChannels()) { channel.close(); } - - _channelMap.clear(); + synchronized (_channelMap) + { + _channelMap.clear(); + } for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { _cachedChannels[i] = null; @@ -780,6 +779,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { if(_closing.compareAndSet(false,true)) { + // force sync of outstanding async work + receiveComplete(); + // REMOVE THIS SHOULD NOT BE HERE. if (CurrentActor.get() == null) { @@ -796,13 +798,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr getConfigStore().removeConfiguredObject(this); - if (_managedObject != null) - { - _managedObject.unregister(); - // Ensure we only do this once. - _managedObject = null; - } - for (Task task : _taskList) { task.doTask(this); @@ -835,7 +830,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } - public void closeConnection(int channelId, AMQConnectionException e) throws AMQException + private void closeConnection(int channelId, AMQConnectionException e) throws AMQException { try { @@ -846,12 +841,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr markChannelAwaitingCloseOk(channelId); closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); } finally { - closeProtocolSession(); + try + { + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + } + finally + { + closeProtocolSession(); + } } @@ -983,16 +984,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _virtualHost.getConnectionRegistry().registerConnection(this); _configStore.addConfiguredObject(this); - - try - { - _managedObject = createMBean(); - _managedObject.register(); - } - catch (JMException e) - { - _logger.error(e); - } } public void addSessionCloseTask(Task task) @@ -1026,7 +1017,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public Principal getAuthorizedPrincipal() { - return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); + return _authorizedSubject == null ? null : _authorizedSubject.getPrincipals().iterator().next(); } public SocketAddress getRemoteAddress() @@ -1039,6 +1030,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _network.getLocalAddress(); } + public Principal getPeerPrincipal() + { + return _network.getPeerPrincipal(); + } + public MethodRegistry getMethodRegistry() { return _methodRegistry; @@ -1144,6 +1140,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return _clientVersion; } + public String getPrincipalAsString() + { + return getAuthId(); + } + + public long getSessionCountLimit() + { + return getMaximumNumberOfChannels(); + } + public Boolean isIncoming() { return true; @@ -1199,9 +1205,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr return false; } - public UUID getId() + @Override + public UUID getQMFId() { - return _id; + return _qmfId; } public long getConnectionId() @@ -1337,6 +1344,36 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr (Throwable) null)); } + public void block() + { + synchronized (_channelMap) + { + if(!_blocking) + { + _blocking = true; + for(AMQChannel channel : _channelMap.values()) + { + channel.block(); + } + } + } + } + + public void unblock() + { + synchronized (_channelMap) + { + if(_blocking) + { + _blocking = false; + for(AMQChannel channel : _channelMap.values()) + { + channel.unblock(); + } + } + } + } + public boolean isClosed() { return _closed; @@ -1344,12 +1381,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public List<AMQSessionModel> getSessionModels() { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); - for (AMQChannel channel : getChannels()) - { - sessions.add((AMQSessionModel) channel); - } - return sessions; + return new ArrayList<AMQSessionModel>(getChannels()); } public LogSubject getLogSubject() @@ -1359,21 +1391,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void registerMessageDelivered(long messageSize) { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); _virtualHost.registerMessageDelivered(messageSize); } public void registerMessageReceived(long messageSize, long timestamp) { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); _virtualHost.registerMessageReceived(messageSize, timestamp); } @@ -1407,29 +1433,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void initialiseStatistics() { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); } - public boolean isStatisticsEnabled() + public boolean isSessionNameUnique(byte[] name) { - return _statisticsEnabled; + // 0-8/0-9/0-9-1 sessions don't have names + return true; } - public void setStatisticsEnabled(boolean enabled) + public String getRemoteAddressString() { - _statisticsEnabled = enabled; + return String.valueOf(getRemoteAddress()); } - public boolean isSessionNameUnique(byte[] name) + public String getClientId() { - // 0-8/0-9/0-9-1 sessions don't have names - return true; + return String.valueOf(getContextKey()); } public void setDeferFlush(boolean deferFlush) @@ -1466,4 +1489,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { return _reference; } + + public Lock getReceivedLock() + { + return _receivedLock; + } } |