summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java99
1 files changed, 51 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1e649c3cb7..ee1ef2418a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -29,11 +29,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -52,10 +52,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -64,10 +61,11 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -75,13 +73,12 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -115,7 +112,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private volatile boolean _closed;
// maximum number of channels this session should have
- private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+ private long _maxNoOfChannels;
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
@@ -142,8 +139,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private final UUID _qmfId;
- private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -156,23 +151,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private boolean _blocking;
private final Lock _receivedLock;
+ private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
+ private final Broker _broker;
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
+
+ public AMQProtocolEngine(Broker broker, NetworkConnection network, final long connectionId)
{
+ _broker = broker;
+ _maxNoOfChannels = (Integer)broker.getAttribute(Broker.SESSION_COUNT_LIMIT);
_receivedLock = new ReentrantLock();
- _stateManager = new AMQStateManager(virtualHostRegistry, this);
+ _stateManager = new AMQStateManager(broker, this);
_codecFactory = new AMQCodecFactory(true, this);
setNetworkConnection(network);
_connectionID = connectionId;
- _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+ _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger());
_logSubject = new ConnectionLogSubject(this);
- _configStore = virtualHostRegistry.getConfigStore();
- _qmfId = _configStore.createId();
-
_actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
initialiseStatistics();
@@ -309,9 +306,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
+ long startTime = 0;
+ String frameToString = null;
if (_logger.isDebugEnabled())
{
- _logger.debug("Frame Received: " + frame);
+ startTime = System.currentTimeMillis();
+ frameToString = frame.toString();
+ _logger.debug("RECV: " + frame);
}
// Check that this channel is not closing
@@ -346,6 +347,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeChannel(channelId);
throw e;
}
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
+ }
}
finally
{
@@ -367,7 +373,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
// This sets the protocol version (and hence framing classes) for this session.
setProtocolVersion(pv);
- String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager(getLocalAddress()).getMechanisms();
+ String mechanisms = _broker.getSubjectCreator(getLocalAddress()).getMechanisms();
String locales = "en_US";
@@ -549,8 +555,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("SEND: " + frame);
+ }
+
_sender.send(buf);
- _lastIoTime = System.currentTimeMillis();
+ final long time = System.currentTimeMillis();
+ _lastIoTime = time;
+ _lastWriteTime.set(time);
+
if(!_deferFlush)
{
_sender.flush();
@@ -749,7 +764,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
if (delay > 0)
{
_network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _network.setMaxReadIdle(BrokerProperties.DEFAULT_HEART_BEAT_TIMEOUT_FACTOR * delay);
}
}
@@ -796,8 +811,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeAllChannels();
- getConfigStore().removeConfiguredObject(this);
-
for (Task task : _taskList)
{
task.doTask(this);
@@ -983,7 +996,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_virtualHost.getConnectionRegistry().registerConnection(this);
- _configStore.addConfiguredObject(this);
}
public void addSessionCloseTask(Task task)
@@ -1017,7 +1029,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public Principal getAuthorizedPrincipal()
{
- return _authorizedSubject == null ? null : _authorizedSubject.getPrincipals().iterator().next();
+ return _authorizedSubject == null ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_authorizedSubject);
}
public SocketAddress getRemoteAddress()
@@ -1070,12 +1082,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void readerIdle()
{
- // Nothing
+ // TODO - enforce disconnect on lack of inbound data
}
public synchronized void writerIdle()
{
- _sender.send(asByteBuffer(HeartbeatBody.FRAME));
+ writeFrame(HeartbeatBody.FRAME);
}
public void exception(Throwable throwable)
@@ -1185,32 +1197,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return null;
}
- public ConfigStore getConfigStore()
- {
- return _configStore;
- }
-
- public ConnectionConfigType getConfigType()
- {
- return ConnectionConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
- }
-
public boolean isDurable()
{
return false;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
public long getConnectionId()
{
return getSessionID();
@@ -1494,4 +1485,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
return _receivedLock;
}
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReceivedTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime.get();
+ }
}