diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java | 99 |
1 files changed, 51 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 1e649c3cb7..ee1ef2418a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -29,11 +29,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -52,10 +52,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.ConnectionConfig; -import org.apache.qpid.server.configuration.ConnectionConfigType; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -64,10 +61,11 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; @@ -75,13 +73,12 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession, ConnectionConfig +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -115,7 +112,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _closed; // maximum number of channels this session should have - private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); + private long _maxNoOfChannels; /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); @@ -142,8 +139,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _maxFrameSize; private final AtomicBoolean _closing = new AtomicBoolean(false); - private final UUID _qmfId; - private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; @@ -156,23 +151,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private boolean _blocking; private final Lock _receivedLock; + private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); + private final Broker _broker; - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) + + public AMQProtocolEngine(Broker broker, NetworkConnection network, final long connectionId) { + _broker = broker; + _maxNoOfChannels = (Integer)broker.getAttribute(Broker.SESSION_COUNT_LIMIT); _receivedLock = new ReentrantLock(); - _stateManager = new AMQStateManager(virtualHostRegistry, this); + _stateManager = new AMQStateManager(broker, this); _codecFactory = new AMQCodecFactory(true, this); setNetworkConnection(network); _connectionID = connectionId; - _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger()); _logSubject = new ConnectionLogSubject(this); - _configStore = virtualHostRegistry.getConfigStore(); - _qmfId = _configStore.createId(); - _actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false)); initialiseStatistics(); @@ -309,9 +306,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { + long startTime = 0; + String frameToString = null; if (_logger.isDebugEnabled()) { - _logger.debug("Frame Received: " + frame); + startTime = System.currentTimeMillis(); + frameToString = frame.toString(); + _logger.debug("RECV: " + frame); } // Check that this channel is not closing @@ -346,6 +347,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeChannel(channelId); throw e; } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); + } } finally { @@ -367,7 +373,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi // This sets the protocol version (and hence framing classes) for this session. setProtocolVersion(pv); - String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager(getLocalAddress()).getMechanisms(); + String mechanisms = _broker.getSubjectCreator(getLocalAddress()).getMechanisms(); String locales = "en_US"; @@ -549,8 +555,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); + + if(_logger.isDebugEnabled()) + { + _logger.debug("SEND: " + frame); + } + _sender.send(buf); - _lastIoTime = System.currentTimeMillis(); + final long time = System.currentTimeMillis(); + _lastIoTime = time; + _lastWriteTime.set(time); + if(!_deferFlush) { _sender.flush(); @@ -749,7 +764,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi if (delay > 0) { _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _network.setMaxReadIdle(BrokerProperties.DEFAULT_HEART_BEAT_TIMEOUT_FACTOR * delay); } } @@ -796,8 +811,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeAllChannels(); - getConfigStore().removeConfiguredObject(this); - for (Task task : _taskList) { task.doTask(this); @@ -983,7 +996,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _virtualHost.getConnectionRegistry().registerConnection(this); - _configStore.addConfiguredObject(this); } public void addSessionCloseTask(Task task) @@ -1017,7 +1029,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public Principal getAuthorizedPrincipal() { - return _authorizedSubject == null ? null : _authorizedSubject.getPrincipals().iterator().next(); + return _authorizedSubject == null ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_authorizedSubject); } public SocketAddress getRemoteAddress() @@ -1070,12 +1082,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // Nothing + // TODO - enforce disconnect on lack of inbound data } public synchronized void writerIdle() { - _sender.send(asByteBuffer(HeartbeatBody.FRAME)); + writeFrame(HeartbeatBody.FRAME); } public void exception(Throwable throwable) @@ -1185,32 +1197,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return null; } - public ConfigStore getConfigStore() - { - return _configStore; - } - - public ConnectionConfigType getConfigType() - { - return ConnectionConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getVirtualHost(); - } - public boolean isDurable() { return false; } - @Override - public UUID getQMFId() - { - return _qmfId; - } - public long getConnectionId() { return getSessionID(); @@ -1494,4 +1485,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _receivedLock; } + + @Override + public long getLastReadTime() + { + return _lastReceivedTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime.get(); + } } |