summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java246
1 files changed, 137 insertions, 109 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index b750b29952..1e649c3cb7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,8 +20,26 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslServer;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -46,13 +64,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -66,25 +81,7 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -109,8 +106,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private AMQCodecFactory _codecFactory;
- private AMQProtocolSessionMBean _managedObject;
-
private SaslServer _saslServer;
private Object _lastReceived;
@@ -147,12 +142,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private final UUID _id;
+ private final UUID _qmfId;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
- private ApplicationRegistry _registry;
- private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private NetworkConnection _network;
@@ -160,14 +153,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private volatile boolean _deferFlush;
private long _lastReceivedTime;
+ private boolean _blocking;
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
+ private final Lock _receivedLock;
public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
+ _receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -179,12 +171,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_logSubject = new ConnectionLogSubject(this);
_configStore = virtualHostRegistry.getConfigStore();
- _id = _configStore.createId();
+ _qmfId = _configStore.createId();
_actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
- _registry = virtualHostRegistry.getApplicationRegistry();
initialiseStatistics();
+
}
public void setNetworkConnection(NetworkConnection network)
@@ -198,11 +190,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_sender = sender;
}
- private AMQProtocolSessionMBean createMBean() throws JMException
- {
- return new AMQProtocolSessionMBean(this);
- }
-
public long getSessionID()
{
return _connectionID;
@@ -244,6 +231,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+
+ _receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -268,6 +257,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_logger.error("Unexpected exception when processing datablock", e);
closeProtocolSession();
}
+ finally
+ {
+ _receivedLock.unlock();
+ }
}
private void receiveComplete()
@@ -374,7 +367,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
// This sets the protocol version (and hence framing classes) for this session.
setProtocolVersion(pv);
- String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+ String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager(getLocalAddress()).getMechanisms();
String locales = "en_US";
@@ -576,7 +569,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public List<AMQChannel> getChannels()
{
- return new ArrayList<AMQChannel>(_channelMap.values());
+ synchronized (_channelMap)
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
}
public AMQChannel getAndAssertChannel(int channelId) throws AMQException
@@ -633,24 +629,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
else
{
- _channelMap.put(channel.getChannelId(), channel);
+ synchronized (_channelMap)
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+
+ if(_blocking)
+ {
+ channel.block();
+ }
+ }
}
if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
{
_cachedChannels[channelId] = channel;
}
-
- checkForNotification();
- }
-
- private void checkForNotification()
- {
- int channelsCount = _channelMap.size();
- if (_managedObject != null && channelsCount >= _maxNoOfChannels)
- {
- _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
- }
}
public Long getMaximumNumberOfChannels()
@@ -735,10 +728,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
public void removeChannel(int channelId)
{
- _channelMap.remove(channelId);
- if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ synchronized (_channelMap)
{
- _cachedChannels[channelId] = null;
+ _channelMap.remove(channelId);
+
+ if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
}
}
@@ -763,12 +760,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
private void closeAllChannels() throws AMQException
{
- for (AMQChannel channel : _channelMap.values())
+ for (AMQChannel channel : getChannels())
{
channel.close();
}
-
- _channelMap.clear();
+ synchronized (_channelMap)
+ {
+ _channelMap.clear();
+ }
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
_cachedChannels[i] = null;
@@ -780,6 +779,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
if(_closing.compareAndSet(false,true))
{
+ // force sync of outstanding async work
+ receiveComplete();
+
// REMOVE THIS SHOULD NOT BE HERE.
if (CurrentActor.get() == null)
{
@@ -796,13 +798,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
getConfigStore().removeConfiguredObject(this);
- if (_managedObject != null)
- {
- _managedObject.unregister();
- // Ensure we only do this once.
- _managedObject = null;
- }
-
for (Task task : _taskList)
{
task.doTask(this);
@@ -835,7 +830,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
- public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
try
{
@@ -846,12 +841,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
markChannelAwaitingCloseOk(channelId);
closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
}
finally
{
- closeProtocolSession();
+ try
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+ }
+ finally
+ {
+ closeProtocolSession();
+ }
}
@@ -983,16 +984,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_virtualHost.getConnectionRegistry().registerConnection(this);
_configStore.addConfiguredObject(this);
-
- try
- {
- _managedObject = createMBean();
- _managedObject.register();
- }
- catch (JMException e)
- {
- _logger.error(e);
- }
}
public void addSessionCloseTask(Task task)
@@ -1026,7 +1017,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public Principal getAuthorizedPrincipal()
{
- return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+ return _authorizedSubject == null ? null : _authorizedSubject.getPrincipals().iterator().next();
}
public SocketAddress getRemoteAddress()
@@ -1039,6 +1030,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return _network.getLocalAddress();
}
+ public Principal getPeerPrincipal()
+ {
+ return _network.getPeerPrincipal();
+ }
+
public MethodRegistry getMethodRegistry()
{
return _methodRegistry;
@@ -1144,6 +1140,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return _clientVersion;
}
+ public String getPrincipalAsString()
+ {
+ return getAuthId();
+ }
+
+ public long getSessionCountLimit()
+ {
+ return getMaximumNumberOfChannels();
+ }
+
public Boolean isIncoming()
{
return true;
@@ -1199,9 +1205,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return false;
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public long getConnectionId()
@@ -1337,6 +1344,36 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
(Throwable) null));
}
+ public void block()
+ {
+ synchronized (_channelMap)
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.block();
+ }
+ }
+ }
+ }
+
+ public void unblock()
+ {
+ synchronized (_channelMap)
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.unblock();
+ }
+ }
+ }
+ }
+
public boolean isClosed()
{
return _closed;
@@ -1344,12 +1381,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public List<AMQSessionModel> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (AMQChannel channel : getChannels())
- {
- sessions.add((AMQSessionModel) channel);
- }
- return sessions;
+ return new ArrayList<AMQSessionModel>(getChannels());
}
public LogSubject getLogSubject()
@@ -1359,21 +1391,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public void registerMessageDelivered(long messageSize)
{
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
_virtualHost.registerMessageDelivered(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
@@ -1407,29 +1433,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public void initialiseStatistics()
{
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
_dataReceived = new StatisticsCounter("data-received-" + getSessionID());
}
- public boolean isStatisticsEnabled()
+ public boolean isSessionNameUnique(byte[] name)
{
- return _statisticsEnabled;
+ // 0-8/0-9/0-9-1 sessions don't have names
+ return true;
}
- public void setStatisticsEnabled(boolean enabled)
+ public String getRemoteAddressString()
{
- _statisticsEnabled = enabled;
+ return String.valueOf(getRemoteAddress());
}
- public boolean isSessionNameUnique(byte[] name)
+ public String getClientId()
{
- // 0-8/0-9/0-9-1 sessions don't have names
- return true;
+ return String.valueOf(getContextKey());
}
public void setDeferFlush(boolean deferFlush)
@@ -1466,4 +1489,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
return _reference;
}
+
+ public Lock getReceivedLock()
+ {
+ return _receivedLock;
+ }
}