summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java226
1 files changed, 50 insertions, 176 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 5e95701e5a..68f1ad7942 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
-import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogActor;
@@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -86,7 +83,7 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -103,9 +100,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private VirtualHost _virtualHost;
- private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
+ new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
- private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ @SuppressWarnings("unchecked")
+ private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -114,9 +113,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*
* Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
-
- private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
+ private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
+ new HashSet<AMQChannel<AMQProtocolEngine>>();
private final AMQStateManager _stateManager;
@@ -124,10 +122,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private SaslServer _saslServer;
- private Object _lastReceived;
-
- private Object _lastSent;
-
private volatile boolean _closed;
// maximum number of channels this session should have
@@ -136,8 +130,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
- private FieldTable _clientProperties;
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<? super AMQProtocolEngine>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
@@ -153,12 +147,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _lastIoTime;
private long _writtenBytes;
- private long _readBytes;
-
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private long _createTime = System.currentTimeMillis();
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -176,6 +167,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private volatile boolean _closeWhenNoRoute;
private volatile boolean _stopped;
+ private long _readBytes;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -258,15 +250,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+ _readBytes += msg.remaining();
_receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- final int len = dataBlocks.size();
- for (int i = 0; i < len; i++)
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- AMQDataBlock dataBlock = dataBlocks.get(i);
try
{
dataBlockReceived(dataBlock);
@@ -316,7 +307,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private void receivedComplete()
{
- for (AMQChannel channel : _channelsForCurrentMessage)
+ for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
{
channel.receivedComplete();
}
@@ -334,7 +325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*/
private void dataBlockReceived(AMQDataBlock message) throws Exception
{
- _lastReceived = message;
if (message instanceof ProtocolInitiation)
{
protocolInitiationReceived((ProtocolInitiation) message);
@@ -363,7 +353,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
- AMQChannel amqChannel = _channelMap.get(channelId);
+ AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
if(amqChannel != null)
{
// The _receivedLock is already acquired in the caller
@@ -558,14 +548,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- for (AMQMethodListener listener : _frameListeners)
- {
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt);
@@ -611,11 +593,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (Exception e)
{
- for (AMQMethodListener listener : _frameListeners)
- {
- listener.error(e);
- }
-
_logger.error("Unexpected exception while processing frame. Closing connection.", e);
closeProtocolSession();
@@ -625,7 +602,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
- AMQChannel channel = getAndAssertChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
channel.publishContentHeader(body);
@@ -633,7 +610,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
- AMQChannel channel = getAndAssertChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
channel.publishContentBody(body);
}
@@ -681,17 +658,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_contextKey = contextKey;
}
- public List<AMQChannel> getChannels()
+ public List<AMQChannel<AMQProtocolEngine>> getChannels()
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel>(_channelMap.values());
+ return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
}
}
- public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+ public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
{
- AMQChannel channel = getChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
if (channel == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -700,9 +677,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return channel;
}
- public AMQChannel getChannel(int channelId)
+ public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
{
- final AMQChannel channel =
+ final AMQChannel<AMQProtocolEngine> channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
@@ -719,7 +696,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
- public void addChannel(AMQChannel channel) throws AMQException
+ public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if (_closed)
{
@@ -770,7 +747,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = value;
}
- public void commitTransactions(AMQChannel channel) throws AMQException
+ public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
@@ -778,7 +755,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- public void rollbackTransactions(AMQChannel channel) throws AMQException
+ public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
@@ -802,7 +779,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void closeChannel(int channelId, AMQConstant cause, String message)
{
- final AMQChannel channel = getChannel(channelId);
+ final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
@@ -879,12 +856,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/**
* Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
- *
- * @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels()
{
- for (AMQChannel channel : getChannels())
+ for (AMQChannel<AMQProtocolEngine> channel : getChannels())
{
channel.close();
}
@@ -929,9 +904,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeAllChannels();
- for (Task task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized(this)
@@ -961,7 +936,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (InterruptedException e)
{
-
+ // do nothing
}
finally
{
@@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
}
- public String dump()
- {
- return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
- }
-
/** @return an object that can be used to identity */
public Object getKey()
{
@@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void setClientProperties(FieldTable clientProperties)
{
- _clientProperties = clientProperties;
- if (_clientProperties != null)
+ if (clientProperties != null)
{
- String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
if (closeWhenNoRoute != null)
{
_closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
@@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
- _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT);
+ _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+ _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
- String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
+ String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
if (clientId != null)
{
setContextKey(new AMQShortString(clientId));
@@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _protocolVersion.getMinorVersion();
}
- public boolean isProtocolVersion(byte major, byte minor)
- {
- return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
- }
-
public MethodRegistry getRegistry()
{
return getMethodRegistry();
@@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
- public void addSessionCloseTask(Task task)
+ public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
_taskList.remove(task);
}
@@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _clientProduct;
}
- public String getPrincipalAsString()
- {
- return getAuthId();
- }
-
public long getSessionCountLimit()
{
return getMaximumNumberOfChannels();
}
- public Boolean isIncoming()
- {
- return true;
- }
-
- public Boolean isSystemConnection()
- {
- return false;
- }
-
- public Boolean isFederationLink()
- {
- return false;
- }
-
- public String getAuthId()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
- public Integer getRemotePID()
- {
- return null;
- }
-
- public String getRemoteProcessName()
- {
- return null;
- }
-
- public Integer getRemoteParentPID()
- {
- return null;
- }
-
public boolean isDurable()
{
return false;
@@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getRemoteAddress());
}
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public Boolean isShadow()
- {
- return false;
- }
-
- public void mgmtClose()
- {
- MethodRegistry methodRegistry = getMethodRegistry();
- ConnectionCloseBody responseBody =
- methodRegistry.createConnectionCloseBody(
- AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("The connection was closed using the broker's management interface."),
- 0,0);
-
- // This seems ugly but because we use closeConnection in both normal
- // broker operation and as part of the management interface it cannot
- // be avoided. The Current Actor will be null when this method is
- // called via the QMF management interface. As such we need to set one.
- boolean removeActor = false;
- if (CurrentActor.get() == null)
- {
- removeActor = true;
- CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
- }
-
- try
- {
- writeFrame(responseBody.generateFrame(0));
-
- closeSession();
-
- }
- finally
- {
- if (removeActor)
- {
- CurrentActor.remove();
- }
- }
- }
-
public void mgmtCloseChannel(int channelId)
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- public String getClientID()
- {
- return getContextKey().toString();
- }
-
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
{
- int channelId = ((AMQChannel)session).getChannelId();
+ int channelId = session.getChannelId();
closeChannel(channelId, cause, message);
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null));
+ null));
}
public void block()
@@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
if(!_blocking)
{
_blocking = true;
- for(AMQChannel channel : _channelMap.values())
+ for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
{
channel.block();
}
@@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
if(_blocking)
{
_blocking = false;
- for(AMQChannel channel : _channelMap.values())
+ for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
{
channel.unblock();
}
@@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _closed;
}
- public List<AMQSessionModel> getSessionModels()
+ public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
{
- return new ArrayList<AMQSessionModel>(getChannels());
+ return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
}
public LogSubject getLogSubject()
@@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getContextKey());
}
- public void setDeferFlush(boolean deferFlush)
+ @Override
+ public String getRemoteContainerName()
{
- _deferFlush = deferFlush;
+ return String.valueOf(getContextKey());
}
- public String getUserName()
+ public void setDeferFlush(boolean deferFlush)
{
- return getAuthorizedPrincipal().getName();
+ _deferFlush = deferFlush;
}
public final class WriteDeliverMethod